Skip to content

Commit

Permalink
Add new experimental-image-proxy hidden command
Browse files Browse the repository at this point in the history
This imports the code from https://github.com/cgwalters/container-image-proxy

First, assume one is operating on a codebase that isn't Go, but wants
to interact with container images - we can't just include the Go containers/image
library.

The primary intended use case of this is for things like
[ostree-containers](ostreedev/ostree-rs-ext#18)
where we're using container images to encapsulate host operating system
updates, but we don't want to involve the [containers/image](github.com/containers/image/)
storage layer.

Vendoring the containers/image stack in another project is a large lift; the stripped
binary for this proxy standalone weighs in at 16M (I'm sure the lack
of LTO and the overall simplicity of the Go compiler is a large factor).
Anyways, I'd like to avoid shipping another copy.

This command is marked as experimental, and hidden.  The goal is
just to use it from the ostree stack for now, ideally shipping at least
in CentOS 9 Stream relatively soon.   We can (and IMO should)
change and improve it later.

A lot more discussion in cgwalters/container-image-proxy#1
  • Loading branch information
cgwalters committed Oct 9, 2021
1 parent fc81803 commit 8d51fcb
Show file tree
Hide file tree
Showing 2 changed files with 356 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/skopeo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func createApp() (*cobra.Command, *globalOptions) {
copyCmd(&opts),
deleteCmd(&opts),
inspectCmd(&opts),
proxyCmd(&opts),
layersCmd(&opts),
loginCmd(&opts),
logoutCmd(&opts),
Expand Down
355 changes: 355 additions & 0 deletions cmd/skopeo/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,355 @@
package main

/*
This code is currently only intended to be used by ostree
to fetch content via containers. The API is subject
to change. A goal however is to stabilize the API
eventually as a full out-of-process interface to the
core containers/image library functionality.
To use this command, in a parent process create a
`socketpair()` of type `SOCK_SEQPACKET`. Fork
off this command, and pass one half of the socket
pair to the child, using `--sockfd` to specify
the fd number.
The protocol is JSON for the control layer,
and a read side of a `pipe()` passed for large data.
*/

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
"sync"
"syscall"

"github.com/containers/image/v5/image"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/blobinfocache"
"github.com/containers/image/v5/transports/alltransports"
"github.com/containers/image/v5/types"
"github.com/opencontainers/go-digest"
"github.com/spf13/cobra"
)


// A JSON request
type request struct {
Method string `json:"method"`
Args []interface{} `json:"args"`
}

// Like Rust's Result<T>, though we explicitly
// represent the success status to be doubly sure.
type reply struct {
Success bool `json:"success"`
Value interface{} `json:"value"`
PipeID uint32 `json:"pipeid"`
Error string `json:"error"`
}

// Our internal deserialization of reply plus optional fd
type replyBuf struct {
value interface{}
fd *os.File
pipeid uint32
}

type activePipe struct {
w *os.File
wg sync.WaitGroup
err error
}

type proxyHandler struct {
lock sync.Mutex
imageref string
sysctx *types.SystemContext
cache types.BlobInfoCache
imgsrc *types.ImageSource
img *types.Image
activePipes map[uint32]*activePipe
}

func (h *proxyHandler) ensureImage() error {
if h.img != nil {
return nil
}
imgRef, err := alltransports.ParseImageName(h.imageref)
if err != nil {
return err
}
imgsrc, err := imgRef.NewImageSource(context.Background(), h.sysctx)
if err != nil {
return err
}
img, err := image.FromUnparsedImage(context.Background(), h.sysctx, image.UnparsedInstance(imgsrc, nil))
if err != nil {
return fmt.Errorf("failed to load image: %w", err)
}
h.img = &img
h.imgsrc = &imgsrc
return nil
}

func (h *proxyHandler) GetManifest(args []interface{}) (replyBuf, error) {
h.lock.Lock()
defer h.lock.Unlock()

var ret replyBuf

if len(args) != 0 {
return ret, fmt.Errorf("invalid request, expecting zero arguments")
}

if err := h.ensureImage(); err != nil {
return ret, err
}

ctx := context.TODO()
rawManifest, _, err := (*h.img).Manifest(ctx)
if err != nil {
return ret, err
}
digest, err := manifest.Digest(rawManifest)
if err != nil {
return ret, err
}
ociManifest, err := manifest.OCI1FromManifest(rawManifest)
if err != nil {
return ret, err
}
ociSerialized, err := ociManifest.Serialize()
if err != nil {
return ret, err
}

piper, pipew, err := os.Pipe()
if err != nil {
return ret, err
}
f := activePipe{
w: pipew,
}
h.activePipes[uint32(pipew.Fd())] = &f
f.wg.Add(1)
go func() {
// Signal completion when we return
defer f.wg.Done()
_, err = io.Copy(f.w, bytes.NewReader(ociSerialized))
if err != nil {
f.err = err
}
}()

r := replyBuf{
value: digest.String(),
fd: piper,
pipeid: uint32(pipew.Fd()),
}
return r, nil
}

func (h *proxyHandler) GetBlob(args []interface{}) (replyBuf, error) {
h.lock.Lock()
defer h.lock.Unlock()

var ret replyBuf

if len(args) != 1 {
return ret, fmt.Errorf("invalid request, expecting one blobid")
}

digestStr, ok := args[0].(string)
if !ok {
return ret, fmt.Errorf("expecting string blobid")
}

if err := h.ensureImage(); err != nil {
return ret, err
}

piper, pipew, err := os.Pipe()
if err != nil {
return ret, err
}

ctx := context.TODO()
d, err := digest.Parse(digestStr)
if err != nil {
return ret, err
}
blobr, blobSize, err := (*h.imgsrc).GetBlob(ctx, types.BlobInfo{Digest: d, Size: -1}, h.cache)
if err != nil {
return ret, err
}

f := activePipe{
w: pipew,
}
h.activePipes[uint32(pipew.Fd())] = &f

f.wg.Add(1)
go func() {
// Signal completion when we return
defer f.wg.Done()
verifier := d.Verifier()
tr := io.TeeReader(blobr, verifier)
_, err = io.Copy(f.w, tr)
if err != nil {
f.err = err
return
}
if !verifier.Verified() {
f.err = fmt.Errorf("corrupted blob, expecting %s", d.String())
}
}()

ret.value = blobSize
ret.fd = piper
ret.pipeid = uint32(pipew.Fd())
return ret, nil
}

func (h *proxyHandler) FinishPipe(args []interface{}) (replyBuf, error) {
h.lock.Lock()
defer h.lock.Unlock()

var ret replyBuf

pipeidf, ok := args[0].(float64)
if !ok {
return ret, fmt.Errorf("finishpipe: expecting blobid, not %T", args[0])
}
pipeid := uint32(pipeidf)

f, ok := h.activePipes[pipeid]
if !ok {
return ret, fmt.Errorf("finishpipe: no active pipe %d", pipeid)
}

f.wg.Wait()
f.w.Close()
err := f.err
delete(h.activePipes, pipeid)
return ret, err
}

func (buf replyBuf) send(conn *net.UnixConn, err error) error {
replyToSerialize := reply{
Success: err == nil,
Value: buf.value,
PipeID: buf.pipeid,
}
if err != nil {
replyToSerialize.Error = err.Error()
}
serializedReply, err := json.Marshal(&replyToSerialize)
if err != nil {
return err
}
defer func() {
if buf.fd != nil {
buf.fd.Close()
}
}()
fds := make([]int, 0)
if buf.fd != nil {
fds = append(fds, int(buf.fd.Fd()))
}
oob := syscall.UnixRights(fds...)
n, oobn, err := conn.WriteMsgUnix(serializedReply, oob, nil)
if err != nil {
return err
}
if n != len(serializedReply) || oobn != len(oob) {
return io.ErrShortWrite
}
return nil
}

type proxyOptions struct {
global *globalOptions
sockFd int
}

func proxyCmd(global *globalOptions) *cobra.Command {
opts := proxyOptions{global: global}
cmd := &cobra.Command{
Use: "experimental-image-proxy [command options] IMAGE",
Short: "Interactive proxy for fetching container images (EXPERIMENTAL)",
Long: `Run skopeo as a proxy, supporting HTTP requests to fetch manifests and blobs.`,
RunE: commandAction(opts.run),
Args: cobra.ExactArgs(1),
// Not stabilized yet
Hidden: true,
Example: `skopeo experimental-image-proxy --sockfd 3`,
}
adjustUsage(cmd)
flags := cmd.Flags()
flags.IntVar(&opts.sockFd, "sockfd", -1, "Serve on opened socket pair")
cmd.MarkFlagRequired("sockfd")
return cmd
}

// Implementation of podman experimental-image-proxy
func (opts *proxyOptions) run(args []string, stdout io.Writer) error {
sysCtx := opts.global.newSystemContext()

handler := &proxyHandler{
imageref: args[0],
sysctx: sysCtx,
cache: blobinfocache.DefaultCache(sysCtx),
activePipes: make(map[uint32]*activePipe),
}

fd := os.NewFile(uintptr(opts.sockFd), "sock")
fconn, err := net.FileConn(fd)
if err != nil {
return err
}
conn := fconn.(*net.UnixConn)

buf := make([]byte, 32*1024)
out:
for {
n, _, err := conn.ReadFrom(buf)
if err != nil {
if errors.Is(err, io.EOF) {
break out
}
return fmt.Errorf("reading socket: %v", err)
}
readbuf := buf[0:n]
var req request
if err := json.Unmarshal(readbuf, &req); err != nil {
rb := replyBuf{}
rb.send(conn, fmt.Errorf("invalid request: %v", err))
}
switch req.Method {
case "GetManifest":
{
rb, err := handler.GetManifest(req.Args)
rb.send(conn, err)
}
case "FinishPipe":
{
rb, err := handler.FinishPipe(req.Args)
rb.send(conn, err)
}
case "Shutdown":
break out
default:
rb := replyBuf{}
rb.send(conn, fmt.Errorf("unknown method: %s", req.Method))
}
}

return nil
}

0 comments on commit 8d51fcb

Please sign in to comment.