Skip to content

Commit

Permalink
Refactor proxy backend uploads
Browse files Browse the repository at this point in the history
Extract duplicated code into utils/backendproxy, and pass both the
logical size and size on disk to the proxy backends when uploading.
  • Loading branch information
mostynb committed Jan 1, 2023
1 parent b899a9d commit be88dad
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 125 deletions.
1 change: 1 addition & 0 deletions cache/azblobproxy/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
deps = [
"//cache:go_default_library",
"//cache/disk/casblob:go_default_library",
"//utils/backendproxy:go_default_library",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//:go_default_library",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
Expand Down
50 changes: 18 additions & 32 deletions cache/azblobproxy/azblobproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import (
"path"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/buchgr/bazel-remote/cache"
"github.com/buchgr/bazel-remote/cache/disk/casblob"
"github.com/buchgr/bazel-remote/utils/backendproxy"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand All @@ -28,38 +31,32 @@ var (
})
)

type uploadReq struct {
hash string
size int64
kind cache.EntryKind
rc io.ReadCloser
}

type azBlobCache struct {
containerClient *azblob.ContainerClient
storageAccount string
container string
prefix string
v2mode bool
uploadQueue chan<- uploadReq
uploadQueue chan<- backendproxy.UploadReq
accessLogger cache.Logger
errorLogger cache.Logger
objectKey func(hash string, kind cache.EntryKind) string
updateTimestamps bool
}

func (c *azBlobCache) Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
func (c *azBlobCache) Put(ctx context.Context, kind cache.EntryKind, hash string, logicalSize int64, sizeOnDisk int64, rc io.ReadCloser) {
if c.uploadQueue == nil {
rc.Close()
return
}

select {
case c.uploadQueue <- uploadReq{
hash: hash,
size: size,
kind: kind,
rc: rc,
case c.uploadQueue <- backendproxy.UploadReq{
Hash: hash,
LogicalSize: logicalSize,
SizeOnDisk: sizeOnDisk,
Kind: kind,
Rc: rc,
}:
default:
c.errorLogger.Printf("too many uploads queued\n")
Expand Down Expand Up @@ -205,26 +202,15 @@ func New(
}
}

if maxQueuedUploads > 0 && numUploaders > 0 {
uploadQueue := make(chan uploadReq, maxQueuedUploads)
for uploader := 0; uploader < numUploaders; uploader++ {
go func() {
for item := range uploadQueue {
c.uploadFile(item)
}
}()
}

c.uploadQueue = uploadQueue
}
c.uploadQueue = backendproxy.StartUploaders(c, numUploaders, maxQueuedUploads)

return c
}

func (c *azBlobCache) uploadFile(item uploadReq) {
defer item.rc.Close()
func (c *azBlobCache) UploadFile(item backendproxy.UploadReq) {
defer item.Rc.Close()

key := c.objectKey(item.hash, item.kind)
key := c.objectKey(item.Hash, item.Kind)
if c.prefix != "" {
key = c.prefix + "/" + key
}
Expand All @@ -234,7 +220,7 @@ func (c *azBlobCache) uploadFile(item uploadReq) {
return
}

_, err = client.Upload(context.Background(), item.rc.(io.ReadSeekCloser), nil)
_, err = client.Upload(context.Background(), item.Rc.(io.ReadSeekCloser), nil)

logResponse(c.accessLogger, "UPLOAD", c.storageAccount, c.container, key, err)
}
Expand Down
9 changes: 5 additions & 4 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ func (e *Error) Error() string {
type Proxy interface {

// Put makes a reasonable effort to asynchronously upload the cache
// item identified by `hash` with logical size `size`, whose data is
// readable from `rc` to the proxy backend. The data available in
// `rc` is in the same format as used by the disk.Cache instance.
// item identified by `hash` with logical size `logicalSize` and
// `sizeOnDisk` bytes on disk, whose data is readable from `rc` to
// the proxy backend. The data available in `rc` is in the same
// format as used by the disk.Cache instance.
//
// This is allowed to fail silently (for example when under heavy load).
Put(ctx context.Context, kind EntryKind, hash string, size int64, rc io.ReadCloser)
Put(ctx context.Context, kind EntryKind, hash string, logicalSize int64, sizeOnDisk int64, rc io.ReadCloser)

// Get returns an io.ReadCloser from which the cache item identified by
// `hash` can be read, its logical size, and an error if something went
Expand Down
2 changes: 1 addition & 1 deletion cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (c *diskCache) Put(ctx context.Context, kind cache.EntryKind, hash string,
log.Println("Failed to proxy Put:", err)
} else {
// Doesn't block, should be fast.
c.proxy.Put(ctx, kind, hash, sizeOnDisk, rc)
c.proxy.Put(ctx, kind, hash, size, sizeOnDisk, rc)
}
}

Expand Down
2 changes: 1 addition & 1 deletion cache/disk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestCacheGetContainsWrongSizeWithProxy(t *testing.T) {
// digest {contentsHash, contentsLength}.
type proxyStub struct{}

func (d proxyStub) Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
func (d proxyStub) Put(ctx context.Context, kind cache.EntryKind, hash string, logicalSize int64, sizeOnDisk int64, rc io.ReadCloser) {
// Not implemented.
}

Expand Down
34 changes: 18 additions & 16 deletions cache/disk/findmissing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ type testCWProxy struct {
blob string
}

func (p *testCWProxy) Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
func (p *testCWProxy) Put(ctx context.Context, kind cache.EntryKind, hash string, logicalSize int64, sizeOnDisk int64, rc io.ReadCloser) {
}

func (p *testCWProxy) Get(ctx context.Context, kind cache.EntryKind, hash string) (io.ReadCloser, int64, error) {
return nil, -1, nil
}

func (p *testCWProxy) Contains(ctx context.Context, kind cache.EntryKind, hash string) (bool, int64) {
if kind == cache.CAS && hash == p.blob {
return true, 42
Expand Down Expand Up @@ -167,8 +169,8 @@ func NewProxyAdapter(cache Cache) (*proxyAdapter, error) {
}, nil
}

func (p *proxyAdapter) Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
err := p.cache.Put(ctx, kind, hash, size, rc)
func (p *proxyAdapter) Put(ctx context.Context, kind cache.EntryKind, hash string, logicalSize int64, sizeOnDisk int64, rc io.ReadCloser) {
err := p.cache.Put(ctx, kind, hash, logicalSize, rc)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -212,8 +214,8 @@ func TestFindMissingCasBlobsWithProxy(t *testing.T) {
data3, digest3 := testutils.RandomDataAndDigest(300)
_, digest4 := testutils.RandomDataAndDigest(400)

proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, io.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest3.Hash, digest3.SizeBytes, io.NopCloser(bytes.NewReader(data3)))
proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, digest1.SizeBytes, io.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest3.Hash, digest3.SizeBytes, digest3.SizeBytes, io.NopCloser(bytes.NewReader(data3)))

missing, err := testCache.FindMissingCasBlobs(ctx, []*pb.Digest{
&digest1,
Expand Down Expand Up @@ -278,8 +280,8 @@ func TestFindMissingCasBlobsWithProxyFailFast(t *testing.T) {
data3, digest3 := testutils.RandomDataAndDigest(300)
_, digest4 := testutils.RandomDataAndDigest(400)

proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, io.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest3.Hash, digest3.SizeBytes, io.NopCloser(bytes.NewReader(data3)))
proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, digest1.SizeBytes, io.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest3.Hash, digest3.SizeBytes, digest3.SizeBytes, io.NopCloser(bytes.NewReader(data3)))

blobs := []*pb.Digest{
&digest1,
Expand Down Expand Up @@ -337,10 +339,10 @@ func TestFindMissingCasBlobsWithProxyFailFastNoneMissing(t *testing.T) {
data3, digest3 := testutils.RandomDataAndDigest(300)
data4, digest4 := testutils.RandomDataAndDigest(400)

proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, io.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest2.Hash, digest2.SizeBytes, io.NopCloser(bytes.NewReader(data2)))
proxy.Put(ctx, cache.CAS, digest3.Hash, digest3.SizeBytes, io.NopCloser(bytes.NewReader(data3)))
proxy.Put(ctx, cache.CAS, digest4.Hash, digest4.SizeBytes, io.NopCloser(bytes.NewReader(data4)))
proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, digest1.SizeBytes, io.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest2.Hash, digest2.SizeBytes, digest2.SizeBytes, io.NopCloser(bytes.NewReader(data2)))
proxy.Put(ctx, cache.CAS, digest3.Hash, digest3.SizeBytes, digest3.SizeBytes, io.NopCloser(bytes.NewReader(data3)))
proxy.Put(ctx, cache.CAS, digest4.Hash, digest4.SizeBytes, digest4.SizeBytes, io.NopCloser(bytes.NewReader(data4)))

blobs := []*pb.Digest{
&digest1,
Expand Down Expand Up @@ -411,9 +413,9 @@ func TestFindMissingCasBlobsWithProxyFailFastMaxProxyBlobSize(t *testing.T) {
data3, digest3 := testutils.RandomDataAndDigest(300) // We expect this blob to not be found.

// Put blobs directly into proxy backend, where it will not be filtered out.
proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, io.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest2.Hash, digest2.SizeBytes, io.NopCloser(bytes.NewReader(data2)))
proxy.Put(ctx, cache.CAS, digest3.Hash, digest3.SizeBytes, io.NopCloser(bytes.NewReader(data3)))
proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, digest1.SizeBytes, io.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest2.Hash, digest2.SizeBytes, digest2.SizeBytes, io.NopCloser(bytes.NewReader(data2)))
proxy.Put(ctx, cache.CAS, digest3.Hash, digest3.SizeBytes, digest3.SizeBytes, io.NopCloser(bytes.NewReader(data3)))

blobs := []*pb.Digest{
&digest1,
Expand Down Expand Up @@ -467,8 +469,8 @@ func TestFindMissingCasBlobsWithProxyMaxProxyBlobSize(t *testing.T) {
data1, digest1 := testutils.RandomDataAndDigest(100)
data2, digest2 := testutils.RandomDataAndDigest(600)

proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, io.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest2.Hash, digest2.SizeBytes, io.NopCloser(bytes.NewReader(data2)))
proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, digest1.SizeBytes, io.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest2.Hash, digest2.SizeBytes, digest2.SizeBytes, io.NopCloser(bytes.NewReader(data2)))

missing, err := testCache.FindMissingCasBlobs(ctx, []*pb.Digest{
&digest1,
Expand Down
1 change: 1 addition & 0 deletions cache/httpproxy/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//cache:go_default_library",
"//cache/disk/casblob:go_default_library",
"//utils/backendproxy:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
],
Expand Down
61 changes: 22 additions & 39 deletions cache/httpproxy/httpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,16 @@ import (

"github.com/buchgr/bazel-remote/cache"
"github.com/buchgr/bazel-remote/cache/disk/casblob"
"github.com/buchgr/bazel-remote/utils/backendproxy"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type uploadReq struct {
hash string
size int64
kind cache.EntryKind
rc io.ReadCloser
}

type remoteHTTPProxyCache struct {
remote *http.Client
baseURL string
uploadQueue chan<- uploadReq
uploadQueue chan<- backendproxy.UploadReq
accessLogger cache.Logger
errorLogger cache.Logger
requestURL func(hash string, kind cache.EntryKind) string
Expand All @@ -47,42 +41,42 @@ var (
})
)

func (r *remoteHTTPProxyCache) uploadFile(item uploadReq) {
func (r *remoteHTTPProxyCache) UploadFile(item backendproxy.UploadReq) {

if item.size == 0 {
item.rc.Close()
if item.LogicalSize == 0 {
item.Rc.Close()
// See https://github.com/golang/go/issues/20257#issuecomment-299509391
item.rc = http.NoBody
item.Rc = http.NoBody
}

url := r.requestURL(item.hash, item.kind)
url := r.requestURL(item.Hash, item.Kind)

req, err := http.NewRequestWithContext(context.Background(), http.MethodHead, url, nil)
if err != nil {
r.errorLogger.Printf("INTERNAL ERROR, FAILED TO SETUP HTTP PROXY UPLOAD %s: %s", url, err)
item.rc.Close()
item.Rc.Close()
return
}

rsp, err := r.remote.Do(req)
if err == nil && rsp.StatusCode == http.StatusOK {
r.accessLogger.Printf("SKIP UPLOAD %s", item.hash)
item.rc.Close()
r.accessLogger.Printf("SKIP UPLOAD %s", item.Hash)
item.Rc.Close()
return
}

req, err = http.NewRequestWithContext(context.Background(), http.MethodPut, url, item.rc)
req, err = http.NewRequestWithContext(context.Background(), http.MethodPut, url, item.Rc)
if err != nil {
r.errorLogger.Printf("INTERNAL ERROR, FAILED TO SETUP HTTP PROXY UPLOAD %s: %s", url, err)

// item.rc will be closed if we call req.Do(), but not if we
// item.Rc will be closed if we call req.Do(), but not if we
// return earlier.
item.rc.Close()
item.Rc.Close()

return
}
req.Header.Set("Content-Type", "application/octet-stream")
req.ContentLength = item.size
req.ContentLength = item.SizeOnDisk

rsp, err = r.remote.Do(req)
if err != nil {
Expand Down Expand Up @@ -131,19 +125,7 @@ func New(baseURL *url.URL, storageMode string, remote *http.Client,
storageMode)
}

if maxQueuedUploads > 0 && numUploaders > 0 {
uploadQueue := make(chan uploadReq, maxQueuedUploads)

for i := 0; i < numUploaders; i++ {
go func() {
for item := range uploadQueue {
proxy.uploadFile(item)
}
}()
}

proxy.uploadQueue = uploadQueue
}
proxy.uploadQueue = backendproxy.StartUploaders(proxy, numUploaders, maxQueuedUploads)

return proxy, nil
}
Expand All @@ -153,17 +135,18 @@ func logResponse(logger cache.Logger, method string, code int, url string) {
logger.Printf("HTTP %s %d %s", method, code, url)
}

func (r *remoteHTTPProxyCache) Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
func (r *remoteHTTPProxyCache) Put(ctx context.Context, kind cache.EntryKind, hash string, logicalSize int64, sizeOnDisk int64, rc io.ReadCloser) {
if r.uploadQueue == nil {
rc.Close()
return
}

item := uploadReq{
hash: hash,
size: size,
kind: kind,
rc: rc,
item := backendproxy.UploadReq{
Hash: hash,
LogicalSize: logicalSize,
SizeOnDisk: sizeOnDisk,
Kind: kind,
Rc: rc,
}

select {
Expand Down
1 change: 1 addition & 0 deletions cache/s3proxy/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
deps = [
"//cache:go_default_library",
"//cache/disk/casblob:go_default_library",
"//utils/backendproxy:go_default_library",
"@com_github_minio_minio_go_v7//:go_default_library",
"@com_github_minio_minio_go_v7//pkg/credentials:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
Expand Down
Loading

0 comments on commit be88dad

Please sign in to comment.