Skip to content

Commit

Permalink
feat: introducing oci artifact registry impl (#2989)
Browse files Browse the repository at this point in the history
fixes #2954 

Introduces the OCI implementation of the module registry. The controller
still uses the database module registry; so to exercise the change the
`ftl release` commands have been introduced to demonstrate pushing
module artifacts to the registry and discovering them (and metadata).
The CLI implementations will change with the subsequent controller
integration work.

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
jonathanj-square and github-actions[bot] authored Oct 11, 2024
1 parent e563ce1 commit b4606de
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.hermit/
.vscode/*
.registry/
!/.vscode/extensions.json
!/.vscode/settings.json
!/.vscode/launch.json
Expand Down
12 changes: 10 additions & 2 deletions backend/controller/artefacts/dal_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ func (s *Service) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCl
}

func (s *Service) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) {
rows, err := s.db.GetDeploymentArtefacts(ctx, releaseID)
return getDatabaseReleaseArtefacts(ctx, s.db, releaseID)
}

func getDatabaseReleaseArtefacts(ctx context.Context, db sql.Querier, releaseID int64) ([]ReleaseArtefact, error) {
rows, err := db.GetDeploymentArtefacts(ctx, releaseID)
if err != nil {
return nil, fmt.Errorf("unable to get release artefacts: %w", libdal.TranslatePGError(err))
}
Expand All @@ -90,7 +94,11 @@ func (s *Service) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]R
}

func (s *Service) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error {
err := s.db.AssociateArtefactWithDeployment(ctx, sql.AssociateArtefactWithDeploymentParams{
return addReleaseArtefacts(ctx, s.db, key, ra)
}

func addReleaseArtefacts(ctx context.Context, db sql.Querier, key model.DeploymentKey, ra ReleaseArtefact) error {
err := db.AssociateArtefactWithDeployment(ctx, sql.AssociateArtefactWithDeploymentParams{
Key: key,
Digest: ra.Artefact.Digest[:],
Executable: ra.Executable,
Expand Down
241 changes: 241 additions & 0 deletions backend/controller/artefacts/oci_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package artefacts

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"strings"

"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2"
"oras.land/oras-go/v2/content/memory"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras-go/v2/registry/remote/auth"
"oras.land/oras-go/v2/registry/remote/retry"

"github.com/TBD54566975/ftl/backend/controller/artefacts/internal/sql"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/sha256"
)

const (
ModuleArtifactPrefix = "ftl/modules/"
)

type ContainerConfig struct {
Registry string `help:"OCI container registry host:port" env:"FTL_ARTEFACTS_REGISTRY"`
Username string `help:"OCI container registry username" env:"FTL_ARTEFACTS_USER"`
Password string `help:"OCI container registry password" env:"FTL_ARTEFACTS_PWD"`
AllowPlainHTTP bool `help:"Allows OCI container requests to accept plain HTTP responses" env:"FTL_ARTEFACTS_ALLOW_HTTP"`
}

type ContainerService struct {
host string
repoConnectionBuilder func(container string) (*remote.Repository, error)

// in the interim releases and artefacts will continue to be linked via the `deployment_artefacts` table
Handle *libdal.Handle[ContainerService]
db sql.Querier
}

type ArtefactRepository struct {
ModuleDigest sha256.SHA256
MediaType string
ArtefactType string
RepositoryDigest digest.Digest
Size int64
}

func NewContainerService(c ContainerConfig, conn libdal.Connection) *ContainerService {
// Connect the registry targeting the specified container
repoConnectionBuilder := func(path string) (*remote.Repository, error) {
ref := fmt.Sprintf("%s/%s", c.Registry, path)
reg, err := remote.NewRepository(ref)
if err != nil {
return nil, fmt.Errorf("unable to connect to container registry '%s': %w", ref, err)
}

reg.Client = &auth.Client{
Client: retry.DefaultClient,
Cache: auth.NewCache(),
Credential: auth.StaticCredential(c.Registry, auth.Credential{
Username: c.Username,
Password: c.Password,
}),
}
reg.PlainHTTP = c.AllowPlainHTTP

return reg, nil
}

return &ContainerService{
host: c.Registry,
repoConnectionBuilder: repoConnectionBuilder,
Handle: libdal.New(conn, func(h *libdal.Handle[ContainerService]) *ContainerService {
return &ContainerService{
host: c.Registry,
repoConnectionBuilder: repoConnectionBuilder,
Handle: h,
db: sql.New(h.Connection),
}
}),
}
}

func (s *ContainerService) GetDigestsKeys(ctx context.Context, digests []sha256.SHA256) (keys []ArtefactKey, missing []sha256.SHA256, err error) {
set := make(map[sha256.SHA256]bool)
for _, d := range digests {
set[d] = true
}
modules, err := s.DiscoverModuleArtefacts(ctx)
if err != nil {
return nil, nil, fmt.Errorf("unable to discover module artefacts: %w", err)
}
keys = make([]ArtefactKey, 0)
for _, m := range modules {
if set[m.ModuleDigest] {
keys = append(keys, ArtefactKey{Digest: m.ModuleDigest})
delete(set, m.ModuleDigest)
}
}
missing = make([]sha256.SHA256, 0)
for d := range set {
missing = append(missing, d)
}
return keys, missing, nil
}

func (s *ContainerService) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) {
ref := fmt.Sprintf("ftl/modules/%s", artefact.Digest)
ms := memory.New()
mediaDescriptor := v1.Descriptor{
MediaType: "application/ftl.module.v1",
Digest: digest.NewDigestFromBytes(digest.SHA256, artefact.Digest[:]),
Size: int64(len(artefact.Content)),
}
err := ms.Push(ctx, mediaDescriptor, bytes.NewReader(artefact.Content))
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to stage artefact in memory: %w", err)
}
artifactType := "application/ftl.module.artifact"
opts := oras.PackManifestOptions{
Layers: []v1.Descriptor{mediaDescriptor},
}
tag := "latest"
manifestDescriptor, err := oras.PackManifest(ctx, ms, oras.PackManifestVersion1_1, artifactType, opts)
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to pack artifact manifest: %w", err)
}
if err = ms.Tag(ctx, manifestDescriptor, tag); err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to tag artifact: %w", err)
}
repo, err := s.repoConnectionBuilder(ref)
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s/%s': %w", s.host, ref, err)
}
if _, err = oras.Copy(ctx, ms, tag, repo, tag, oras.DefaultCopyOptions); err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to push artefact upstream from staging: %w", err)
}
return artefact.Digest, nil
}

func (s *ContainerService) Download(ctx context.Context, digest sha256.SHA256) (io.ReadCloser, error) {
ref := createModuleRepositoryPathFromDigest(digest)
registry, err := s.repoConnectionBuilder(ref)
if err != nil {
return nil, fmt.Errorf("unable to connect to registry '%s/%s': %w", s.host, ref, err)
}
_, stream, err := oras.Fetch(ctx, registry, createModuleRepositoryReferenceFromDigest(s.host, digest), oras.DefaultFetchOptions)
if err != nil {
return nil, fmt.Errorf("unable to download artefact: %w", err)
}
return stream, nil
}

func (s *ContainerService) DiscoverModuleArtefacts(ctx context.Context) ([]ArtefactRepository, error) {
return s.DiscoverArtefacts(ctx, ModuleArtifactPrefix)
}

func (s *ContainerService) DiscoverArtefacts(ctx context.Context, prefix string) ([]ArtefactRepository, error) {
registry, err := remote.NewRegistry(s.host)
if err != nil {
return nil, fmt.Errorf("unable to connect to registry '%s': %w", s.host, err)
}
registry.PlainHTTP = true
result := make([]ArtefactRepository, 0)
err = registry.Repositories(ctx, "", func(repos []string) error {
for _, path := range repos {
if !strings.HasPrefix(path, prefix) {
continue
}
d, err := getDigestFromModuleRepositoryPath(path)
if err != nil {
return fmt.Errorf("unable to get digest from repository path '%s': %w", path, err)
}
repo, err := registry.Repository(ctx, path)
if err != nil {
return fmt.Errorf("unable to connect to repository '%s': %w", path, err)
}
desc, err := repo.Resolve(ctx, "latest")
if err != nil {
return fmt.Errorf("unable to resolve module metadata '%s': %w", path, err)
}
_, data, err := oras.FetchBytes(ctx, repo, desc.Digest.String(), oras.DefaultFetchBytesOptions)
if err != nil {
return fmt.Errorf("unable to fetch module metadata '%s': %w", path, err)
}
var manifest v1.Manifest
if err := json.Unmarshal(data, &manifest); err != nil {
return fmt.Errorf("unable to unmarshal module metadata '%s': %w", path, err)
}
result = append(result, ArtefactRepository{
ModuleDigest: d,
MediaType: manifest.Layers[0].MediaType,
ArtefactType: manifest.ArtifactType,
RepositoryDigest: desc.Digest,
Size: desc.Size,
})
}
return nil
})
if err != nil {
return nil, fmt.Errorf("unable to discover artefacts: %w", err)
}
return result, nil
}

func (s *ContainerService) GetReleaseArtefacts(ctx context.Context, releaseID int64) ([]ReleaseArtefact, error) {
return getDatabaseReleaseArtefacts(ctx, s.db, releaseID)
}

func (s *ContainerService) AddReleaseArtefact(ctx context.Context, key model.DeploymentKey, ra ReleaseArtefact) error {
return addReleaseArtefacts(ctx, s.db, key, ra)
}

// createModuleRepositoryPathFromDigest creates the path to the repository, relative to the registries root
func createModuleRepositoryPathFromDigest(digest sha256.SHA256) string {
return fmt.Sprintf("%s/%s:latest", ModuleArtifactPrefix, hex.EncodeToString(digest[:]))
}

// createModuleRepositoryReferenceFromDigest creates the URL used to connect to the repository
func createModuleRepositoryReferenceFromDigest(host string, digest sha256.SHA256) string {
return fmt.Sprintf("%s/%s", host, createModuleRepositoryPathFromDigest(digest))
}

// getDigestFromModuleRepositoryPath extracts the digest from the module repository path; e.g. /ftl/modules/<digest>:latest
func getDigestFromModuleRepositoryPath(repository string) (sha256.SHA256, error) {
slash := strings.LastIndex(repository, "/")
if slash == -1 {
return sha256.SHA256{}, fmt.Errorf("unable to parse repository '%s'", repository)
}
d, err := sha256.ParseSHA256(repository[slash+1:])
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to parse repository digest '%s': %w", repository, err)
}
return d, nil
}
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ services:
environment:
SERVICES: secretsmanager
DEBUG: 1
registry:
image: registry:2
ports:
- "5001:5000"
volumes:
- ./.registry:/var/lib/registry

volumes:
grafana-storage: {}
87 changes: 87 additions & 0 deletions frontend/cli/cmd_release.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package main

import (
"context"
"crypto/sha256"
"fmt"

"github.com/google/uuid"

"github.com/TBD54566975/ftl/backend/controller/artefacts"
internalobservability "github.com/TBD54566975/ftl/internal/observability"
)

type releaseCmd struct {
Registry string `help:"Registry host:port" default:"127.0.0.1:5001"`
DSN string `help:"DAL DSN." default:"postgres://127.0.0.1:15432/ftl?sslmode=disable&user=postgres&password=secret" env:"FTL_CONTROLLER_DSN"`
MaxOpenDBConnections int `help:"Maximum number of database connections." default:"20" env:"FTL_MAX_OPEN_DB_CONNECTIONS"`
MaxIdleDBConnections int `help:"Maximum number of idle database connections." default:"20" env:"FTL_MAX_IDLE_DB_CONNECTIONS"`

Publish releasePublishCmd `cmd:"" help:"Packages the project into a release and publishes it."`
List releaseListCmd `cmd:"" help:"Lists all published releases."`
}

type releasePublishCmd struct {
}

func (d *releasePublishCmd) Run(release *releaseCmd) error {
svc, err := createContainerService(release)
if err != nil {
return fmt.Errorf("failed to create container service: %w", err)
}
content := uuid.New()
contentBytes := content[:]
hash, err := svc.Upload(context.Background(), artefacts.Artefact{
Digest: sha256.Sum256(contentBytes),
Metadata: artefacts.Metadata{
Path: fmt.Sprintf("random/%s", content),
},
Content: contentBytes,
})
if err != nil {
return fmt.Errorf("failed upload artefact: %w", err)
}
fmt.Printf("Artefact published with hash: \033[31m%s\033[0m\n", hash)
return nil
}

type releaseListCmd struct {
}

func (d *releaseListCmd) Run(release *releaseCmd) error {
svc, err := createContainerService(release)
if err != nil {
return fmt.Errorf("failed to create container service: %w", err)
}
modules, err := svc.DiscoverModuleArtefacts(context.Background())
if err != nil {
return fmt.Errorf("failed to discover module artefacts: %w", err)
}
if len(modules) == 0 {
fmt.Println("No module artefacts found.")
return nil
}

format := " Digest : %s\n Size : %-7d\n Repo Digest : %s\n Media Type : %s\n Artefact Type : %s\n"
fmt.Printf("Found %d module artefacts:\n", len(modules))
for i, m := range modules {
fmt.Printf("\033[31m Artefact %d\033[0m\n", i)
fmt.Printf(format, m.ModuleDigest, m.Size, m.RepositoryDigest, m.MediaType, m.ArtefactType)
}

return nil
}

func createContainerService(release *releaseCmd) (*artefacts.ContainerService, error) {
conn, err := internalobservability.OpenDBAndInstrument(release.DSN)
if err != nil {
return nil, fmt.Errorf("failed to open DB connection: %w", err)
}
conn.SetMaxIdleConns(release.MaxIdleDBConnections)
conn.SetMaxOpenConns(release.MaxOpenDBConnections)

return artefacts.NewContainerService(artefacts.ContainerConfig{
Registry: release.Registry,
AllowPlainHTTP: true,
}, conn), nil
}
1 change: 1 addition & 0 deletions frontend/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type InteractiveCLI struct {
Secret secretCmd `cmd:"" help:"Manage secrets."`
Config configCmd `cmd:"" help:"Manage configuration."`
Pubsub pubsubCmd `cmd:"" help:"Manage pub/sub."`
Release releaseCmd `cmd:"" help:"Manage releases."`
}

type CLI struct {
Expand Down
Loading

0 comments on commit b4606de

Please sign in to comment.