diff --git a/config/config.go b/config/config.go index dffa4d53a..dc03ef9ee 100644 --- a/config/config.go +++ b/config/config.go @@ -71,6 +71,7 @@ type Config struct { HTTPReadTimeout time.Duration `yaml:"http_read_timeout"` HTTPWriteTimeout time.Duration `yaml:"http_write_timeout"` AccessLogLevel string `yaml:"access_log_level"` + MaxBlobSize int `yaml:"max_blob_size"` // Fields that are created by combinations of the flags above. ProxyBackend cache.Proxy @@ -104,7 +105,8 @@ func newFromArgs(dir string, maxSize int, storageMode string, experimentalRemoteAssetAPI bool, httpReadTimeout time.Duration, httpWriteTimeout time.Duration, - accessLogLevel string) (*Config, error) { + accessLogLevel string, + maxBlobSize int) (*Config, error) { c := Config{ Host: host, @@ -135,6 +137,7 @@ func newFromArgs(dir string, maxSize int, storageMode string, HTTPReadTimeout: httpReadTimeout, HTTPWriteTimeout: httpWriteTimeout, AccessLogLevel: accessLogLevel, + MaxBlobSize: maxBlobSize, } err := validateConfig(&c) @@ -381,5 +384,6 @@ func get(ctx *cli.Context) (*Config, error) { ctx.Duration("http_read_timeout"), ctx.Duration("http_write_timeout"), ctx.String("access_log_level"), + ctx.Int("max_blob_size"), ) } diff --git a/main.go b/main.go index 2468b1a71..2c053116b 100644 --- a/main.go +++ b/main.go @@ -97,7 +97,7 @@ func run(ctx *cli.Context) error { validateAC := !c.DisableHTTPACValidation h := server.NewHTTPCache(diskCache, c.AccessLogger, c.ErrorLogger, validateAC, - c.EnableACKeyInstanceMangling, c.AllowUnauthenticatedReads, gitCommit) + c.EnableACKeyInstanceMangling, c.AllowUnauthenticatedReads, c.MaxBlobSize, gitCommit) var htpasswdSecrets auth.SecretProvider authMode := "disabled" @@ -206,12 +206,14 @@ func run(ctx *cli.Context) error { log.Println("experimental gRPC remote asset API:", remoteAssetStatus) checkClientCertForWrites := c.AllowUnauthenticatedReads && c.TLSCaFile != "" + maxBlobSize := c.MaxBlobSize err3 := server.ListenAndServeGRPC(addr, opts, validateAC, c.EnableACKeyInstanceMangling, enableRemoteAssetAPI, checkClientCertForWrites, + maxBlobSize, diskCache, c.AccessLogger, c.ErrorLogger) if err3 != nil { log.Fatal(err3) diff --git a/server/grpc.go b/server/grpc.go index c8bdd0635..9c5cbbbf4 100644 --- a/server/grpc.go +++ b/server/grpc.go @@ -42,6 +42,7 @@ type grpcServer struct { depsCheck bool mangleACKeys bool checkClientCertForWrites bool + maxBlobSize int } // ListenAndServeGRPC creates a new gRPC server and listens on the given @@ -52,6 +53,7 @@ func ListenAndServeGRPC(addr string, opts []grpc.ServerOption, mangleACKeys bool, enableRemoteAssetAPI bool, checkClientCertForWrites bool, + maxBlobSize int, c *disk.Cache, a cache.Logger, e cache.Logger) error { listener, err := net.Listen("tcp", addr) @@ -59,7 +61,7 @@ func ListenAndServeGRPC(addr string, opts []grpc.ServerOption, return err } - return serveGRPC(listener, opts, validateACDeps, mangleACKeys, enableRemoteAssetAPI, checkClientCertForWrites, c, a, e) + return serveGRPC(listener, opts, validateACDeps, mangleACKeys, enableRemoteAssetAPI, checkClientCertForWrites, maxBlobSize, c, a, e) } func serveGRPC(l net.Listener, opts []grpc.ServerOption, @@ -67,6 +69,7 @@ func serveGRPC(l net.Listener, opts []grpc.ServerOption, mangleACKeys bool, enableRemoteAssetAPI bool, checkClientCertForWrites bool, + maxBlobSize int, c *disk.Cache, a cache.Logger, e cache.Logger) error { srv := grpc.NewServer(opts...) @@ -75,6 +78,7 @@ func serveGRPC(l net.Listener, opts []grpc.ServerOption, depsCheck: validateACDepsCheck, mangleACKeys: mangleACKeys, checkClientCertForWrites: checkClientCertForWrites, + maxBlobSize: maxBlobSize, } pb.RegisterActionCacheServer(srv, s) pb.RegisterCapabilitiesServer(srv, s) diff --git a/server/grpc_bytestream.go b/server/grpc_bytestream.go index a2e4826b8..13ae1e60d 100644 --- a/server/grpc_bytestream.go +++ b/server/grpc_bytestream.go @@ -336,6 +336,7 @@ func (s *grpcServer) parseWriteResource(r string) (string, int64, casblob.Compre var errWriteOffset error = errors.New("bytestream writes from non-zero offsets are unsupported") var errDecoderPoolFail error = errors.New("failed to get DecoderWrapper from pool") +var errRequestObjectTooLarge error = errors.New("write request exceeds configured maximum object size") func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error { @@ -411,6 +412,11 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error { return } + if (s.maxBlobSize > 0 && size > int64(s.maxBlobSize) * 1024 * 1024) { + recvResult <- errRequestObjectTooLarge + return + } + resp.CommittedSize = req.WriteOffset if req.WriteOffset != 0 { err = errWriteOffset diff --git a/server/http.go b/server/http.go index a5634cc53..6d7dcc468 100644 --- a/server/http.go +++ b/server/http.go @@ -45,6 +45,7 @@ type httpCache struct { mangleACKeys bool gitCommit string checkClientCertForWrites bool + maxBlobSize int } type statusPageData struct { @@ -62,7 +63,7 @@ type statusPageData struct { // accessLogger will print one line for each HTTP request to stdout. // errorLogger will print unexpected server errors. Inexistent files and malformed URLs will not // be reported. -func NewHTTPCache(cache *disk.Cache, accessLogger cache.Logger, errorLogger cache.Logger, validateAC bool, mangleACKeys bool, checkClientCertForWrites bool, commit string) HTTPCache { +func NewHTTPCache(cache *disk.Cache, accessLogger cache.Logger, errorLogger cache.Logger, validateAC bool, mangleACKeys bool, checkClientCertForWrites bool, maxBlobSize int, commit string) HTTPCache { _, _, numItems, _ := cache.Stats() @@ -75,6 +76,7 @@ func NewHTTPCache(cache *disk.Cache, accessLogger cache.Logger, errorLogger cach validateAC: validateAC, mangleACKeys: mangleACKeys, checkClientCertForWrites: checkClientCertForWrites, + maxBlobSize: maxBlobSize, } if commit != "{STABLE_GIT_COMMIT}" { @@ -293,6 +295,14 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) { return } + if (h.maxBlobSize > 0 && contentLength > int64(h.maxBlobSize) * 1024 * 1024) { + msg := fmt.Sprintf("write request length %d exceeds configured maximum object size %d MB", + contentLength, h.maxBlobSize) + http.Error(w, msg, http.StatusBadRequest) + h.errorLogger.Printf("PUT %s: %s", path(kind, hash), msg) + return + } + zstdCompressed := false // Content-Encoding must be one of "identity", "zstd" or not present. diff --git a/utils/flags/flags.go b/utils/flags/flags.go index 70b5b5eab..193a70779 100644 --- a/utils/flags/flags.go +++ b/utils/flags/flags.go @@ -121,6 +121,13 @@ func GetCliFlags() []cli.Flag { Usage: "When using proxy backends, sets the maximum number of objects in queue for upload. If the queue is full, uploads will be skipped until the queue has space again.", EnvVars: []string{"BAZEL_REMOTE_MAX_QUEUED_UPLOADS"}, }, + &cli.Int64Flag{ + Name: "max_blob_size", + Value: 0, + Usage: "The maximum size of an individual blob stored in the cache in MiB. Upload requests will fail for any larger blobs.", + DefaultText: "0, ie unlimited", + EnvVars: []string{"BAZEL_REMOTE_MAX_BLOB_SIZE"}, + }, &cli.IntFlag{ Name: "num_uploaders", Value: 100,