Skip to content

Commit

Permalink
Added in-memory caching of blobs
Browse files Browse the repository at this point in the history
Signed-off-by: jay-dee7 <[email protected]>
  • Loading branch information
jay-dee7 committed Jun 19, 2021
1 parent 4facdc2 commit 82485cd
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 37 deletions.
12 changes: 7 additions & 5 deletions cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ type dataStore struct {
}

type Store interface {
Get(key []byte) ([]byte, error)
Set(key, value []byte) error
Update(key, value []byte) error
Get(key []byte) ([]byte, error)
ListWithPrefix(prefix []byte) ([]byte, error)
ListAll() ([]byte, error)
ListWithPrefix(prefix []byte) ([]byte, error)
Delete(key []byte) error
GetSkynetURL(key string, ref string) (string, error)
ResolveManifestRef(namespace, ref string) (string, error)
Expand All @@ -46,7 +46,7 @@ func (ds *dataStore) Metadata(ctx echo.Context) error {

val, err := ds.Get([]byte(key))
if err != nil {
ctx.String(http.StatusNotFound, err.Error())
return ctx.String(http.StatusNotFound, err.Error())
}

return ctx.JSONBlob(http.StatusOK, val)
Expand Down Expand Up @@ -102,7 +102,7 @@ func (ds *dataStore) removeDuplicateLayers(src, dst []*types.Layer) []*types.Lay
func (ds *dataStore) ResolveManifestRef(namespace, ref string) (string, error) {
color.Yellow("key=%s ref=%s\n", namespace, ref)
var res []byte
err := ds.db.View(func(txn *badger.Txn) error {
fn := func(txn *badger.Txn) error {
item, err := txn.Get([]byte(namespace))
if err != nil {
return err
Expand All @@ -113,7 +113,9 @@ func (ds *dataStore) ResolveManifestRef(namespace, ref string) (string, error) {
copy(res, v)
return nil
})
})
}

err := ds.db.View(fn)

if err != nil {
return "", err
Expand Down
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func main() {
p := prometheus.NewPrometheus("echo", nil)
p.Use(e)
e.HideBanner = true
e.HidePort = true

l := setupLogger()
localCache, err := cache.New("/tmp/badger")
Expand Down Expand Up @@ -81,7 +80,7 @@ func main() {
router.Add(http.MethodPut, "/blobs/uploads/", reg.CompleteUpload)

// PUT /v2/<name>/blobs/uploads/<uuid>?digest=<digest>
router.Add(http.MethodPut, "/blobs/uploads/:reference", reg.CompleteUpload)
router.Add(http.MethodPut, "/blobs/uploads/:uuid", reg.CompleteUpload)

// PUT /v2/<name>/manifests/<reference>
router.Add(http.MethodPut, "/manifests/:reference", reg.PushManifest)
Expand All @@ -96,7 +95,7 @@ func main() {
// PATCH

// PATCH /v2/<name>/blobs/uploads/<uuid>
router.Add(http.MethodPatch, "/blobs/uploads/:buggu", reg.ChunkedUpload)
router.Add(http.MethodPatch, "/blobs/uploads/:uuid", reg.ChunkedUpload)
// router.Add(http.MethodPatch, "/blobs/uploads/", reg.ChunkedUpload)

// GET
Expand All @@ -114,6 +113,7 @@ func main() {
e.Add(http.MethodGet, "/v2/", reg.ApiVersion)

e.Start(config.Address())
// e.StartTLS(config.Address(), config.TLSCertPath, config.TLSKeyPath)

// go func() {
// if err := e.Start(config.Address()); err != nil && err != http.ErrServerClosed {
Expand Down
2 changes: 1 addition & 1 deletion parachute.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
debug: true
host: 0.0.0.0
host: 100.87.37.43
port: 5000
tls_key_path: ./certs/private_key.txt
tls_cert_path: ./certs/certificate.txt
Expand Down
12 changes: 5 additions & 7 deletions server/registry/v2/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ func (b *blobs) UploadBlob(ctx echo.Context) error {
uuid := strings.Split(ctx.Request().RequestURI, "/")[6]
// color.Magenta(strings.Split(ctx.Request().RequestURI, "/")[6])

color.Red("namespace: %s", namespace)
color.Red("uuid %s uri:=%s",uuid, ctx.Request().RequestURI)

if contentRange == "" {
if _, ok := b.uploads[uuid]; ok {
errMsg := b.errorResponse(RegistryErrorCodeBlobUploadInvalid, "stream upload after first write are not allowed", nil)
Expand All @@ -140,10 +137,11 @@ func (b *blobs) UploadBlob(ctx echo.Context) error {
locationHeader := fmt.Sprintf("/v2/%s/blobs/uploads/%s", namespace, uuid)
ctx.Response().Header().Set("Location", locationHeader)
ctx.Response().Header().Set("Range", fmt.Sprintf("0-%d", len(bz)-1))
return ctx.NoContent(http.StatusNoContent)
return ctx.NoContent(http.StatusAccepted)
}

start, end := 0, 0
// 0-90
if _, err := fmt.Sscanf(contentRange, "%d-%d", &start, &end); err != nil {
details := map[string]interface{}{
"error": "content range is invalid",
Expand All @@ -159,14 +157,14 @@ func (b *blobs) UploadBlob(ctx echo.Context) error {
return ctx.JSONBlob(http.StatusRequestedRangeNotSatisfiable, errMsg)
}

buf := bytes.NewBuffer(b.uploads[uuid])
io.Copy(buf, ctx.Request().Body)
buf := bytes.NewBuffer(b.uploads[uuid]) // 90
io.Copy(buf, ctx.Request().Body) // 10
defer ctx.Request().Body.Close()

b.uploads[uuid] = buf.Bytes()

locationHeader := fmt.Sprintf("/v2/%s/blobs/uploads/%s", namespace, uuid)
ctx.Response().Header().Set("Location", locationHeader)
ctx.Response().Header().Set("Range", fmt.Sprintf("0-%d", buf.Len()-1))
return ctx.NoContent(http.StatusNoContent)
return ctx.NoContent(http.StatusAccepted)
}
26 changes: 11 additions & 15 deletions server/registry/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,25 @@ func (r *registry) MonolithicUpload(ctx echo.Context) error {
func (r *registry) CompleteUpload(ctx echo.Context) error {
dig := ctx.QueryParam("digest")
namespace := ctx.Param("username") + "/" + ctx.Param("imagename")
ref := ctx.Param("reference")
uuid := ctx.Param("uuid")
// contentRange := ctx.Request().Header.Get("Content-Range")

bz, err := io.ReadAll(ctx.Request().Body)
if err != nil {
errMsg := r.errorResponse(RegistryErrorCodeDigestInvalid, err.Error(), nil)
return ctx.JSONBlob(http.StatusBadRequest, errMsg)
}
defer ctx.Request().Body.Close()

buf := bytes.NewBuffer(r.b.uploads[ref])
io.CopyN(buf, ctx.Request().Body, ctx.Request().ContentLength)
buf := bytes.NewBuffer(r.b.uploads[uuid])
buf.Write(bz)
// io.Copy(buf, ctx.Request().Body)
// io.CopyN(buf, ctx.Request().Body, ctx.Request().ContentLength-1)

ourHash := digest(buf.Bytes())

if ourHash != dig {
details := map[string]interface{}{
"headerDigest": dig, "serverSideDigest": ourHash,
"headerDigest": dig, "serverSideDigest": ourHash, "bodyDigest": digest(bz),
}
errMsg := r.errorResponse(RegistryErrorCodeDigestInvalid, "digest mismatch", details)
r.debugf(details)
Expand Down Expand Up @@ -130,15 +131,8 @@ func (r *registry) CompleteUpload(ctx echo.Context) error {
Manifest: types.ImageManifest{
SchemaVersion: 2,
MediaType: "",
Layers: []*types.Layer{
{
MediaType: "",
Size: len(bz),
Digest: dig,
SkynetLink: skylink,
UUID: ref,
},
},
Layers: []*types.Layer{{MediaType: "", Size: len(bz), Digest: dig, SkynetLink: skylink, UUID: uuid}},
Config: types.Config{},
},
}

Expand All @@ -157,6 +151,7 @@ func (r *registry) CompleteUpload(ctx echo.Context) error {
// Docker-Content-Digest: <digest>
func (r *registry) LayerExists(ctx echo.Context) error {
return r.b.HEAD(ctx)

namespace := ctx.Param("username") + "/" + ctx.Param("imagename")
digest := ctx.Param("digest") // ref can be either tag or digest

Expand Down Expand Up @@ -349,7 +344,7 @@ func (r *registry) CancelUpload(ctx echo.Context) error {
return nil
}

// GET /v2/<name>/manifests/<reference>
// PullManifest GET /v2/<name>/manifests/<reference>
func (r *registry) PullManifest(ctx echo.Context) error {
namespace := ctx.Param("username") + "/" + ctx.Param("imagename")
ref := ctx.Param("reference")
Expand Down Expand Up @@ -544,6 +539,7 @@ func (r *registry) PullLayer(ctx echo.Context) error {
func (r *registry) BlobMount(ctx echo.Context) error {
return nil
}

func (r *registry) PushImage(ctx echo.Context) error {
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions skynet/skynet.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ func NewClient(c *config.RegistryConfig) *Client {
CustomUserAgent: c.SkynetConfig.CustomUserAgent,
}

skynet.NewCustom(c.SkynetPortalURL, opts)
skynetClient := skynet.New()
skynetClient := skynet.NewCustom(c.SkynetPortalURL, opts)
//skynetClient := skynet.New()

return &Client{
skynet: &skynetClient,
Expand All @@ -30,6 +30,7 @@ func NewClient(c *config.RegistryConfig) *Client {

func (c *Client) Upload(namespace, digest string, content []byte, headers ...skynet.Header) (string, error) {
opts := skynet.DefaultUploadOptions

opts.CustomDirname = namespace

data := make(skynet.UploadData)
Expand Down
8 changes: 4 additions & 4 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ func (md Metadata) Bytes() []byte {
}

func (md Metadata) FindLinkForDigest(ref string) (string, error) {
if md.Manifest.Config.Digest == ref {
return md.Manifest.Config.SkynetLink, nil
}

for _, l := range md.Manifest.Layers {
if l.Digest == ref {
return l.SkynetLink, nil
}
}

if md.Manifest.Config.Digest == ref {
return md.Manifest.Config.SkynetLink, nil
}

return "", fmt.Errorf("ref does not exists")
}

0 comments on commit 82485cd

Please sign in to comment.