Skip to content

Commit

Permalink
Add --max_blob_size flag
Browse files Browse the repository at this point in the history
This causes both a gRPC and HTTP endpoint to reject blob writes that are larger than the configured size (in MB)

Fixes buchgr#440
  • Loading branch information
alexeagle committed May 24, 2021
1 parent d45c53e commit ffdb70f
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 4 deletions.
6 changes: 5 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Config struct {
HTTPReadTimeout time.Duration `yaml:"http_read_timeout"`
HTTPWriteTimeout time.Duration `yaml:"http_write_timeout"`
AccessLogLevel string `yaml:"access_log_level"`
MaxBlobSize int `yaml:"max_blob_size"`

// Fields that are created by combinations of the flags above.
ProxyBackend cache.Proxy
Expand Down Expand Up @@ -104,7 +105,8 @@ func newFromArgs(dir string, maxSize int, storageMode string,
experimentalRemoteAssetAPI bool,
httpReadTimeout time.Duration,
httpWriteTimeout time.Duration,
accessLogLevel string) (*Config, error) {
accessLogLevel string,
maxBlobSize int) (*Config, error) {

c := Config{
Host: host,
Expand Down Expand Up @@ -135,6 +137,7 @@ func newFromArgs(dir string, maxSize int, storageMode string,
HTTPReadTimeout: httpReadTimeout,
HTTPWriteTimeout: httpWriteTimeout,
AccessLogLevel: accessLogLevel,
MaxBlobSize: maxBlobSize,
}

err := validateConfig(&c)
Expand Down Expand Up @@ -381,5 +384,6 @@ func get(ctx *cli.Context) (*Config, error) {
ctx.Duration("http_read_timeout"),
ctx.Duration("http_write_timeout"),
ctx.String("access_log_level"),
ctx.Int("max_blob_size"),
)
}
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func run(ctx *cli.Context) error {

validateAC := !c.DisableHTTPACValidation
h := server.NewHTTPCache(diskCache, c.AccessLogger, c.ErrorLogger, validateAC,
c.EnableACKeyInstanceMangling, c.AllowUnauthenticatedReads, gitCommit)
c.EnableACKeyInstanceMangling, c.AllowUnauthenticatedReads, c.MaxBlobSize, gitCommit)

var htpasswdSecrets auth.SecretProvider
authMode := "disabled"
Expand Down Expand Up @@ -206,12 +206,14 @@ func run(ctx *cli.Context) error {
log.Println("experimental gRPC remote asset API:", remoteAssetStatus)

checkClientCertForWrites := c.AllowUnauthenticatedReads && c.TLSCaFile != ""
maxBlobSize := c.MaxBlobSize

err3 := server.ListenAndServeGRPC(addr, opts,
validateAC,
c.EnableACKeyInstanceMangling,
enableRemoteAssetAPI,
checkClientCertForWrites,
maxBlobSize,
diskCache, c.AccessLogger, c.ErrorLogger)
if err3 != nil {
log.Fatal(err3)
Expand Down
6 changes: 5 additions & 1 deletion server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type grpcServer struct {
depsCheck bool
mangleACKeys bool
checkClientCertForWrites bool
maxBlobSize int
}

// ListenAndServeGRPC creates a new gRPC server and listens on the given
Expand All @@ -52,21 +53,23 @@ func ListenAndServeGRPC(addr string, opts []grpc.ServerOption,
mangleACKeys bool,
enableRemoteAssetAPI bool,
checkClientCertForWrites bool,
maxBlobSize int,
c *disk.Cache, a cache.Logger, e cache.Logger) error {

listener, err := net.Listen("tcp", addr)
if err != nil {
return err
}

return serveGRPC(listener, opts, validateACDeps, mangleACKeys, enableRemoteAssetAPI, checkClientCertForWrites, c, a, e)
return serveGRPC(listener, opts, validateACDeps, mangleACKeys, enableRemoteAssetAPI, checkClientCertForWrites, maxBlobSize, c, a, e)
}

func serveGRPC(l net.Listener, opts []grpc.ServerOption,
validateACDepsCheck bool,
mangleACKeys bool,
enableRemoteAssetAPI bool,
checkClientCertForWrites bool,
maxBlobSize int,
c *disk.Cache, a cache.Logger, e cache.Logger) error {

srv := grpc.NewServer(opts...)
Expand All @@ -75,6 +78,7 @@ func serveGRPC(l net.Listener, opts []grpc.ServerOption,
depsCheck: validateACDepsCheck,
mangleACKeys: mangleACKeys,
checkClientCertForWrites: checkClientCertForWrites,
maxBlobSize: maxBlobSize,
}
pb.RegisterActionCacheServer(srv, s)
pb.RegisterCapabilitiesServer(srv, s)
Expand Down
6 changes: 6 additions & 0 deletions server/grpc_bytestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func (s *grpcServer) parseWriteResource(r string) (string, int64, casblob.Compre

var errWriteOffset error = errors.New("bytestream writes from non-zero offsets are unsupported")
var errDecoderPoolFail error = errors.New("failed to get DecoderWrapper from pool")
var errRequestObjectTooLarge error = errors.New("write request exceeds configured maximum object size")

func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error {

Expand Down Expand Up @@ -411,6 +412,11 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error {
return
}

if (s.maxBlobSize > 0 && size > int64(s.maxBlobSize) * 1024 * 1024) {
recvResult <- errRequestObjectTooLarge
return
}

resp.CommittedSize = req.WriteOffset
if req.WriteOffset != 0 {
err = errWriteOffset
Expand Down
12 changes: 11 additions & 1 deletion server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type httpCache struct {
mangleACKeys bool
gitCommit string
checkClientCertForWrites bool
maxBlobSize int
}

type statusPageData struct {
Expand All @@ -62,7 +63,7 @@ type statusPageData struct {
// accessLogger will print one line for each HTTP request to stdout.
// errorLogger will print unexpected server errors. Inexistent files and malformed URLs will not
// be reported.
func NewHTTPCache(cache *disk.Cache, accessLogger cache.Logger, errorLogger cache.Logger, validateAC bool, mangleACKeys bool, checkClientCertForWrites bool, commit string) HTTPCache {
func NewHTTPCache(cache *disk.Cache, accessLogger cache.Logger, errorLogger cache.Logger, validateAC bool, mangleACKeys bool, checkClientCertForWrites bool, maxBlobSize int, commit string) HTTPCache {

_, _, numItems, _ := cache.Stats()

Expand All @@ -75,6 +76,7 @@ func NewHTTPCache(cache *disk.Cache, accessLogger cache.Logger, errorLogger cach
validateAC: validateAC,
mangleACKeys: mangleACKeys,
checkClientCertForWrites: checkClientCertForWrites,
maxBlobSize: maxBlobSize,
}

if commit != "{STABLE_GIT_COMMIT}" {
Expand Down Expand Up @@ -293,6 +295,14 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) {
return
}

if (h.maxBlobSize > 0 && contentLength > int64(h.maxBlobSize) * 1024 * 1024) {
msg := fmt.Sprintf("write request length %d exceeds configured maximum object size %d MB",
contentLength, h.maxBlobSize)
http.Error(w, msg, http.StatusBadRequest)
h.errorLogger.Printf("PUT %s: %s", path(kind, hash), msg)
return
}

zstdCompressed := false

// Content-Encoding must be one of "identity", "zstd" or not present.
Expand Down
7 changes: 7 additions & 0 deletions utils/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ func GetCliFlags() []cli.Flag {
Usage: "When using proxy backends, sets the maximum number of objects in queue for upload. If the queue is full, uploads will be skipped until the queue has space again.",
EnvVars: []string{"BAZEL_REMOTE_MAX_QUEUED_UPLOADS"},
},
&cli.Int64Flag{
Name: "max_blob_size",
Value: 0,
Usage: "The maximum size of an individual blob stored in the cache in MiB. Upload requests will fail for any larger blobs.",
DefaultText: "0, ie unlimited",
EnvVars: []string{"BAZEL_REMOTE_MAX_BLOB_SIZE"},
},
&cli.IntFlag{
Name: "num_uploaders",
Value: 100,
Expand Down

0 comments on commit ffdb70f

Please sign in to comment.