From 0c51d92f417dc963fc7f562ceb8cce8ed773b0ab Mon Sep 17 00:00:00 2001 From: abushwang Date: Fri, 6 Dec 2024 15:52:32 +0800 Subject: [PATCH] Add fuse-manager Signed-off-by: abushwang Co-authored-by: Zuti He --- Makefile | 5 +- cmd/containerd-stargz-grpc/fsopts/fsopts.go | 81 +++ cmd/containerd-stargz-grpc/main.go | 187 +++--- cmd/go.mod | 2 +- cmd/stargz-fuse-manager/main.go | 57 ++ docs/overview.md | 8 + fs/fs.go | 20 +- fusemanager/api/api.pb.go | 566 ++++++++++++++++++ fusemanager/api/api.proto | 58 ++ fusemanager/api/generate.go | 19 + fusemanager/client.go | 141 +++++ fusemanager/fusemanager.go | 259 ++++++++ fusemanager/fusestore.go | 123 ++++ fusemanager/service.go | 330 ++++++++++ go.mod | 4 +- .../keychain/keychainconfig/keychainconfig.go | 88 +++ service/service.go | 39 +- 17 files changed, 1863 insertions(+), 124 deletions(-) create mode 100644 cmd/containerd-stargz-grpc/fsopts/fsopts.go create mode 100644 cmd/stargz-fuse-manager/main.go create mode 100644 fusemanager/api/api.pb.go create mode 100644 fusemanager/api/api.proto create mode 100644 fusemanager/api/generate.go create mode 100644 fusemanager/client.go create mode 100644 fusemanager/fusemanager.go create mode 100644 fusemanager/fusestore.go create mode 100644 fusemanager/service.go create mode 100644 service/keychain/keychainconfig/keychainconfig.go diff --git a/Makefile b/Makefile index ce37ad087..c5ec50b4b 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ REVISION=$(shell git rev-parse HEAD)$(shell if ! git diff --no-ext-diff --quiet GO_BUILD_LDFLAGS ?= -s -w GO_LD_FLAGS=-ldflags '$(GO_BUILD_LDFLAGS) -X $(PKG)/version.Version=$(VERSION) -X $(PKG)/version.Revision=$(REVISION) $(GO_EXTRA_LDFLAGS)' -CMD=containerd-stargz-grpc ctr-remote stargz-store +CMD=containerd-stargz-grpc ctr-remote stargz-store stargz-fuse-manager CMD_BINARIES=$(addprefix $(PREFIX),$(CMD)) @@ -48,6 +48,9 @@ stargz-store: FORCE stargz-store-helper: FORCE cd cmd/ ; GO111MODULE=$(GO111MODULE_VALUE) go build -o $(PREFIX)$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) -v ./stargz-store/helper +stargz-fuse-manager: FORCE + cd cmd/ ; GO111MODULE=$(GO111MODULE_VALUE) go build -o $(PREFIX)$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) -v ./stargz-fuse-manager + check: @echo "$@" @GO111MODULE=$(GO111MODULE_VALUE) $(shell go env GOPATH)/bin/golangci-lint run diff --git a/cmd/containerd-stargz-grpc/fsopts/fsopts.go b/cmd/containerd-stargz-grpc/fsopts/fsopts.go new file mode 100644 index 000000000..01a09049b --- /dev/null +++ b/cmd/containerd-stargz-grpc/fsopts/fsopts.go @@ -0,0 +1,81 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package fsopts + +import ( + "context" + "fmt" + "io" + "path/filepath" + + "github.com/containerd/log" + dbmetadata "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/db" + ipfs "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/ipfs" + "github.com/containerd/stargz-snapshotter/fs" + "github.com/containerd/stargz-snapshotter/metadata" + memorymetadata "github.com/containerd/stargz-snapshotter/metadata/memory" + bolt "go.etcd.io/bbolt" +) + +type Config struct { + EnableIpfs bool + MetadataStore string +} + +const ( + memoryMetadataType = "memory" + dbMetadataType = "db" +) + +func ConfigFsOpts(ctx context.Context, rootDir string, config *Config) ([]fs.Option, error) { + fsOpts := []fs.Option{fs.WithMetricsLogLevel(log.InfoLevel)} + + if config.EnableIpfs { + fsOpts = append(fsOpts, fs.WithResolveHandler("ipfs", new(ipfs.ResolveHandler))) + } + + mt, err := getMetadataStore(rootDir, config) + if err != nil { + return nil, fmt.Errorf("failed to configure metadata store: %w", err) + } + fsOpts = append(fsOpts, fs.WithMetadataStore(mt)) + + return fsOpts, nil +} + +func getMetadataStore(rootDir string, config *Config) (metadata.Store, error) { + switch config.MetadataStore { + case "", memoryMetadataType: + return memorymetadata.NewReader, nil + case dbMetadataType: + bOpts := bolt.Options{ + NoFreelistSync: true, + InitialMmapSize: 64 * 1024 * 1024, + FreelistType: bolt.FreelistMapType, + } + db, err := bolt.Open(filepath.Join(rootDir, "metadata.db"), 0600, &bOpts) + if err != nil { + return nil, err + } + return func(sr *io.SectionReader, opts ...metadata.Option) (metadata.Reader, error) { + return dbmetadata.NewReader(db, sr, opts...) + }, nil + default: + return nil, fmt.Errorf("unknown metadata store type: %v; must be %v or %v", + config.MetadataStore, memoryMetadataType, dbMetadataType) + } +} diff --git a/cmd/containerd-stargz-grpc/main.go b/cmd/containerd-stargz-grpc/main.go index ff078ab2c..531ebefb2 100644 --- a/cmd/containerd-stargz-grpc/main.go +++ b/cmd/containerd-stargz-grpc/main.go @@ -20,12 +20,12 @@ import ( "context" "flag" "fmt" - "io" golog "log" "math/rand" "net" "net/http" "os" + "os/exec" "os/signal" "path/filepath" "time" @@ -33,30 +33,19 @@ import ( snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1" "github.com/containerd/containerd/v2/contrib/snapshotservice" "github.com/containerd/containerd/v2/core/snapshots" - "github.com/containerd/containerd/v2/defaults" - "github.com/containerd/containerd/v2/pkg/dialer" "github.com/containerd/containerd/v2/pkg/sys" "github.com/containerd/log" - dbmetadata "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/db" - ipfs "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/ipfs" - "github.com/containerd/stargz-snapshotter/fs" - "github.com/containerd/stargz-snapshotter/metadata" - memorymetadata "github.com/containerd/stargz-snapshotter/metadata/memory" + "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/fsopts" + "github.com/containerd/stargz-snapshotter/fusemanager" "github.com/containerd/stargz-snapshotter/service" - "github.com/containerd/stargz-snapshotter/service/keychain/cri" - "github.com/containerd/stargz-snapshotter/service/keychain/dockerconfig" - "github.com/containerd/stargz-snapshotter/service/keychain/kubeconfig" - "github.com/containerd/stargz-snapshotter/service/resolver" + "github.com/containerd/stargz-snapshotter/service/keychain/keychainconfig" + snbase "github.com/containerd/stargz-snapshotter/snapshot" "github.com/containerd/stargz-snapshotter/version" sddaemon "github.com/coreos/go-systemd/v22/daemon" metrics "github.com/docker/go-metrics" "github.com/pelletier/go-toml" - bolt "go.etcd.io/bbolt" "golang.org/x/sys/unix" "google.golang.org/grpc" - "google.golang.org/grpc/backoff" - "google.golang.org/grpc/credentials/insecure" - runtime "k8s.io/cri-api/pkg/apis/runtime/v1" ) const ( @@ -65,14 +54,19 @@ const ( defaultLogLevel = log.InfoLevel defaultRootDir = "/var/lib/containerd-stargz-grpc" defaultImageServiceAddress = "/run/containerd/containerd.sock" + defaultFuseManagerAddress = "/run/containerd-stargz-grpc/fuse-manager.sock" + + fuseManagerBin = "stargz-fuse-manager" + fuseManagerAddress = "fuse-manager.sock" ) var ( - address = flag.String("address", defaultAddress, "address for the snapshotter's GRPC server") - configPath = flag.String("config", defaultConfigPath, "path to the configuration file") - logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]") - rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter") - printVersion = flag.Bool("version", false, "print the version") + address = flag.String("address", defaultAddress, "address for the snapshotter's GRPC server") + configPath = flag.String("config", defaultConfigPath, "path to the configuration file") + logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]") + rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter") + detachFuseManager = flag.Bool("detach-fuse-manager", false, "whether detach fusemanager or not") + printVersion = flag.Bool("version", false, "print the version") ) type snapshotterConfig struct { @@ -92,6 +86,11 @@ type snapshotterConfig struct { // MetadataStore is the type of the metadata store to use. MetadataStore string `toml:"metadata_store" default:"memory"` + // FuseManagerAddress is address for the fusemanager's GRPC server + FuseManagerAddress string `toml:"fusemanager_address"` + + // FuseManagerPath is path to the fusemanager's executable + FuseManagerPath string `toml:"fusemanager_path"` } func main() { @@ -140,44 +139,73 @@ func main() { } // Configure keychain - credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)} - if config.Config.KubeconfigKeychainConfig.EnableKeychain { - var opts []kubeconfig.Option - if kcp := config.Config.KubeconfigKeychainConfig.KubeconfigPath; kcp != "" { - opts = append(opts, kubeconfig.WithKubeconfigPath(kcp)) - } - credsFuncs = append(credsFuncs, kubeconfig.NewKubeconfigKeychain(ctx, opts...)) + keyChainConfig := keychainconfig.Config{ + EnableKubeKeychain: config.Config.KubeconfigKeychainConfig.EnableKeychain, + EnableCRIKeychain: config.Config.CRIKeychainConfig.EnableKeychain, + KubeconfigPath: config.Config.KubeconfigKeychainConfig.KubeconfigPath, + DefaultImageServiceAddress: defaultImageServiceAddress, + ImageServicePath: config.CRIKeychainConfig.ImageServicePath, } - if config.Config.CRIKeychainConfig.EnableKeychain { - // connects to the backend CRI service (defaults to containerd socket) - criAddr := defaultImageServiceAddress - if cp := config.CRIKeychainConfig.ImageServicePath; cp != "" { - criAddr = cp - } - connectCRI := func() (runtime.ImageServiceClient, error) { - conn, err := newCRIConn(criAddr) + + var rs snapshots.Snapshotter + if *detachFuseManager { + fmPath := config.FuseManagerPath + if fmPath == "" { + var err error + fmPath, err = exec.LookPath(fuseManagerBin) if err != nil { - return nil, err + log.G(ctx).WithError(err).Fatalf("failed to find fusemanager bin") } - return runtime.NewImageServiceClient(conn), nil } - f, criServer := cri.NewCRIKeychain(ctx, connectCRI) - runtime.RegisterImageServiceServer(rpc, criServer) - credsFuncs = append(credsFuncs, f) - } - fsOpts := []fs.Option{fs.WithMetricsLogLevel(log.InfoLevel)} - if config.IPFS { - fsOpts = append(fsOpts, fs.WithResolveHandler("ipfs", new(ipfs.ResolveHandler))) - } - mt, err := getMetadataStore(*rootDir, config) - if err != nil { - log.G(ctx).WithError(err).Fatalf("failed to configure metadata store") - } - fsOpts = append(fsOpts, fs.WithMetadataStore(mt)) - rs, err := service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config, - service.WithCredsFuncs(credsFuncs...), service.WithFilesystemOptions(fsOpts...)) - if err != nil { - log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter") + fmAddr := config.FuseManagerAddress + if fmAddr == "" { + fmAddr = defaultFuseManagerAddress + } + + if !filepath.IsAbs(fmAddr) { + log.G(ctx).WithError(err).Fatalf("fuse manager address must be an absolute path: %s", fmAddr) + } + err := fusemanager.StartFuseManager(ctx, fmPath, fmAddr, filepath.Join(*rootDir, "fusestore.db"), *logLevel, filepath.Join(*rootDir, "stargz-fuse-manager.log")) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to start fusemanager") + } + + fuseManagerConfig := fusemanager.Config{ + Config: &config.Config, + IPFS: config.IPFS, + MetadataStore: config.MetadataStore, + DefaultImageServiceAddress: defaultImageServiceAddress, + } + + fs, err := fusemanager.NewManagerClient(ctx, *rootDir, fmAddr, &fuseManagerConfig) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to configure fusemanager") + } + rs, err = snbase.NewSnapshotter(ctx, filepath.Join(*rootDir, "snapshotter"), fs, snbase.AsynchronousRemove) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter") + } + log.G(ctx).Infof("Start snapshotter with fusemanager mode") + } else { + credsFuncs, err := keychainconfig.ConfigKeychain(ctx, rpc, &keyChainConfig) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to configure keychain") + } + + fsConfig := fsopts.Config{ + EnableIpfs: config.IPFS, + MetadataStore: config.MetadataStore, + } + fsOpts, err := fsopts.ConfigFsOpts(ctx, *rootDir, &fsConfig) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to configure fs config") + } + + rs, err = service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config, + service.WithCredsFuncs(credsFuncs...), service.WithFilesystemOptions(fsOpts...)) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter") + } } cleanup, err := serve(ctx, rpc, *address, rs, config) @@ -185,6 +213,10 @@ func main() { log.G(ctx).WithError(err).Fatalf("failed to serve snapshotter") } + // TODO: In detach mode, rs is taken over by fusemanager, + // but client will send unmount request to fusemanager, + // and fusemanager need get mount info from local db to + // determine its behavior if cleanup { log.G(ctx).Debug("Closing the snapshotter") rs.Close() @@ -275,48 +307,3 @@ func serve(ctx context.Context, rpc *grpc.Server, addr string, rs snapshots.Snap } return false, nil } - -const ( - memoryMetadataType = "memory" - dbMetadataType = "db" -) - -func getMetadataStore(rootDir string, config snapshotterConfig) (metadata.Store, error) { - switch config.MetadataStore { - case "", memoryMetadataType: - return memorymetadata.NewReader, nil - case dbMetadataType: - bOpts := bolt.Options{ - NoFreelistSync: true, - InitialMmapSize: 64 * 1024 * 1024, - FreelistType: bolt.FreelistMapType, - } - db, err := bolt.Open(filepath.Join(rootDir, "metadata.db"), 0600, &bOpts) - if err != nil { - return nil, err - } - return func(sr *io.SectionReader, opts ...metadata.Option) (metadata.Reader, error) { - return dbmetadata.NewReader(db, sr, opts...) - }, nil - default: - return nil, fmt.Errorf("unknown metadata store type: %v; must be %v or %v", - config.MetadataStore, memoryMetadataType, dbMetadataType) - } -} - -func newCRIConn(criAddr string) (*grpc.ClientConn, error) { - // TODO: make gRPC options configurable from config.toml - backoffConfig := backoff.DefaultConfig - backoffConfig.MaxDelay = 3 * time.Second - connParams := grpc.ConnectParams{ - Backoff: backoffConfig, - } - gopts := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithConnectParams(connParams), - grpc.WithContextDialer(dialer.ContextDialer), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)), - grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)), - } - return grpc.NewClient(dialer.DialAddress(criAddr), gopts...) -} diff --git a/cmd/go.mod b/cmd/go.mod index bec37eeec..25bfba62b 100644 --- a/cmd/go.mod +++ b/cmd/go.mod @@ -26,7 +26,6 @@ require ( golang.org/x/sync v0.9.0 golang.org/x/sys v0.26.0 google.golang.org/grpc v1.68.0 - k8s.io/cri-api v0.32.0-alpha.0 ) require ( @@ -141,6 +140,7 @@ require ( k8s.io/api v0.31.2 // indirect k8s.io/apimachinery v0.31.2 // indirect k8s.io/client-go v0.31.2 // indirect + k8s.io/cri-api v0.32.0-alpha.0 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect diff --git a/cmd/stargz-fuse-manager/main.go b/cmd/stargz-fuse-manager/main.go new file mode 100644 index 000000000..cae5ccd86 --- /dev/null +++ b/cmd/stargz-fuse-manager/main.go @@ -0,0 +1,57 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package main + +import ( + "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/fsopts" + fusemanager "github.com/containerd/stargz-snapshotter/fusemanager" + "github.com/containerd/stargz-snapshotter/service" + "github.com/containerd/stargz-snapshotter/service/keychain/keychainconfig" +) + +func init() { + fusemanager.RegisterConfigFunc(func(cc *fusemanager.ConfigContext) ([]service.Option, error) { + fsConfig := fsopts.Config{ + EnableIpfs: cc.Config.IPFS, + MetadataStore: cc.Config.MetadataStore, + } + fsOpts, err := fsopts.ConfigFsOpts(cc.Ctx, cc.RootDir, &fsConfig) + if err != nil { + return nil, err + } + return []service.Option{service.WithFilesystemOptions(fsOpts...)}, nil + }) + + fusemanager.RegisterConfigFunc(func(cc *fusemanager.ConfigContext) ([]service.Option, error) { + keyChainConfig := keychainconfig.Config{ + EnableKubeKeychain: cc.Config.Config.KubeconfigKeychainConfig.EnableKeychain, + EnableCRIKeychain: cc.Config.Config.CRIKeychainConfig.EnableKeychain, + KubeconfigPath: cc.Config.Config.KubeconfigKeychainConfig.KubeconfigPath, + DefaultImageServiceAddress: cc.Config.DefaultImageServiceAddress, + ImageServicePath: cc.Config.Config.CRIKeychainConfig.ImageServicePath, + } + credsFuncs, err := keychainconfig.ConfigKeychain(cc.Ctx, cc.Server, &keyChainConfig) + if err != nil { + return nil, err + } + return []service.Option{service.WithCredsFuncs(credsFuncs...)}, nil + }) +} + +func main() { + fusemanager.Run() +} diff --git a/docs/overview.md b/docs/overview.md index ef438d8fa..30925b100 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -95,6 +95,14 @@ root@1d43741b8d29:/go# cat /.stargz-snapshotter/* {"digest":"sha256:f077511be7d385c17ba88980379c5cd0aab7068844dffa7a1cefbf68cc3daea3","size":580,"fetchedSize":580,"fetchedPercent":100} ``` +## Fuse manager + +Remote snapshots are mounted using FUSE, its filesystem process are attached to stargz snapshotter. If stargz snapshotter restarts (The reason may be a change of configuration or a crash), all filesystem process will be killed and restarted, which cause the remount of FUSE mountpoints, making running containers unavailable. + +To avoid this, we use a fuse daemon called fuse manager to handle filesystem process. Fuse manager is responsible for Mount and Unmount of remote snapshotters, its process is detached from stargz snapshotter main process to an independent one in a shim-like way during snapshotter's startup, so the restart of snapshotter won't affect filesystem process it manages, keeping mountpoints and running containers available during restart. But we still need to note that the restart of fuse manager itself triggers remount, so it's recommended to keep fuse manager running in good state. + +You can enable fuse manager by adding flag `--detach-fuse-manager=true` to stargz snapshotter. + ## Registry-related configuration You can configure stargz snapshotter for accessing registries with custom configurations. diff --git a/fs/fs.go b/fs/fs.go index 47191e99a..4547ae32e 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -72,6 +72,12 @@ const ( ) var fusermountBin = []string{"fusermount", "fusermount3"} +var ( + nsLock = sync.Mutex{} + + ns *metrics.Namespace + metricsCtr *layermetrics.Controller +) type Option func(*options) @@ -160,18 +166,20 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F return nil, fmt.Errorf("failed to setup resolver: %w", err) } - var ns *metrics.Namespace - if !cfg.NoPrometheus { + nsLock.Lock() + defer nsLock.Unlock() + + if !cfg.NoPrometheus && ns == nil { ns = metrics.NewNamespace("stargz", "fs", nil) logLevel := log.DebugLevel if fsOpts.metricsLogLevel != nil { logLevel = *fsOpts.metricsLogLevel } commonmetrics.Register(logLevel) // Register common metrics. This will happen only once. + metrics.Register(ns) // Register layer metrics. } - c := layermetrics.NewLayerMetrics(ns) - if ns != nil { - metrics.Register(ns) // Register layer metrics. + if metricsCtr == nil { + metricsCtr = layermetrics.NewLayerMetrics(ns) } return &filesystem{ @@ -185,7 +193,7 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F backgroundTaskManager: tm, allowNoVerification: cfg.AllowNoVerification, disableVerification: cfg.DisableVerification, - metricsController: c, + metricsController: metricsCtr, attrTimeout: attrTimeout, entryTimeout: entryTimeout, }, nil diff --git a/fusemanager/api/api.pb.go b/fusemanager/api/api.pb.go new file mode 100644 index 000000000..66017f1e1 --- /dev/null +++ b/fusemanager/api/api.pb.go @@ -0,0 +1,566 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: api.proto + +package api + +import ( + context "context" + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type StatusRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatusRequest) Reset() { *m = StatusRequest{} } +func (m *StatusRequest) String() string { return proto.CompactTextString(m) } +func (*StatusRequest) ProtoMessage() {} +func (*StatusRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{0} +} +func (m *StatusRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StatusRequest.Unmarshal(m, b) +} +func (m *StatusRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StatusRequest.Marshal(b, m, deterministic) +} +func (m *StatusRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_StatusRequest.Merge(m, src) +} +func (m *StatusRequest) XXX_Size() int { + return xxx_messageInfo_StatusRequest.Size(m) +} +func (m *StatusRequest) XXX_DiscardUnknown() { + xxx_messageInfo_StatusRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_StatusRequest proto.InternalMessageInfo + +type InitRequest struct { + Root string `protobuf:"bytes,1,opt,name=root,proto3" json:"root,omitempty"` + Config []byte `protobuf:"bytes,2,opt,name=config,proto3" json:"config,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *InitRequest) Reset() { *m = InitRequest{} } +func (m *InitRequest) String() string { return proto.CompactTextString(m) } +func (*InitRequest) ProtoMessage() {} +func (*InitRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{1} +} +func (m *InitRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_InitRequest.Unmarshal(m, b) +} +func (m *InitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_InitRequest.Marshal(b, m, deterministic) +} +func (m *InitRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_InitRequest.Merge(m, src) +} +func (m *InitRequest) XXX_Size() int { + return xxx_messageInfo_InitRequest.Size(m) +} +func (m *InitRequest) XXX_DiscardUnknown() { + xxx_messageInfo_InitRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_InitRequest proto.InternalMessageInfo + +func (m *InitRequest) GetRoot() string { + if m != nil { + return m.Root + } + return "" +} + +func (m *InitRequest) GetConfig() []byte { + if m != nil { + return m.Config + } + return nil +} + +type MountRequest struct { + Mountpoint string `protobuf:"bytes,1,opt,name=mountpoint,proto3" json:"mountpoint,omitempty"` + Labels map[string]string `protobuf:"bytes,2,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MountRequest) Reset() { *m = MountRequest{} } +func (m *MountRequest) String() string { return proto.CompactTextString(m) } +func (*MountRequest) ProtoMessage() {} +func (*MountRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{2} +} +func (m *MountRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_MountRequest.Unmarshal(m, b) +} +func (m *MountRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_MountRequest.Marshal(b, m, deterministic) +} +func (m *MountRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_MountRequest.Merge(m, src) +} +func (m *MountRequest) XXX_Size() int { + return xxx_messageInfo_MountRequest.Size(m) +} +func (m *MountRequest) XXX_DiscardUnknown() { + xxx_messageInfo_MountRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_MountRequest proto.InternalMessageInfo + +func (m *MountRequest) GetMountpoint() string { + if m != nil { + return m.Mountpoint + } + return "" +} + +func (m *MountRequest) GetLabels() map[string]string { + if m != nil { + return m.Labels + } + return nil +} + +type CheckRequest struct { + Mountpoint string `protobuf:"bytes,1,opt,name=mountpoint,proto3" json:"mountpoint,omitempty"` + Labels map[string]string `protobuf:"bytes,2,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CheckRequest) Reset() { *m = CheckRequest{} } +func (m *CheckRequest) String() string { return proto.CompactTextString(m) } +func (*CheckRequest) ProtoMessage() {} +func (*CheckRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{3} +} +func (m *CheckRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_CheckRequest.Unmarshal(m, b) +} +func (m *CheckRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_CheckRequest.Marshal(b, m, deterministic) +} +func (m *CheckRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_CheckRequest.Merge(m, src) +} +func (m *CheckRequest) XXX_Size() int { + return xxx_messageInfo_CheckRequest.Size(m) +} +func (m *CheckRequest) XXX_DiscardUnknown() { + xxx_messageInfo_CheckRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_CheckRequest proto.InternalMessageInfo + +func (m *CheckRequest) GetMountpoint() string { + if m != nil { + return m.Mountpoint + } + return "" +} + +func (m *CheckRequest) GetLabels() map[string]string { + if m != nil { + return m.Labels + } + return nil +} + +type UnmountRequest struct { + Mountpoint string `protobuf:"bytes,1,opt,name=mountpoint,proto3" json:"mountpoint,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UnmountRequest) Reset() { *m = UnmountRequest{} } +func (m *UnmountRequest) String() string { return proto.CompactTextString(m) } +func (*UnmountRequest) ProtoMessage() {} +func (*UnmountRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{4} +} +func (m *UnmountRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_UnmountRequest.Unmarshal(m, b) +} +func (m *UnmountRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_UnmountRequest.Marshal(b, m, deterministic) +} +func (m *UnmountRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_UnmountRequest.Merge(m, src) +} +func (m *UnmountRequest) XXX_Size() int { + return xxx_messageInfo_UnmountRequest.Size(m) +} +func (m *UnmountRequest) XXX_DiscardUnknown() { + xxx_messageInfo_UnmountRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_UnmountRequest proto.InternalMessageInfo + +func (m *UnmountRequest) GetMountpoint() string { + if m != nil { + return m.Mountpoint + } + return "" +} + +type StatusResponse struct { + Status int32 `protobuf:"varint,1,opt,name=status,proto3" json:"status,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StatusResponse) Reset() { *m = StatusResponse{} } +func (m *StatusResponse) String() string { return proto.CompactTextString(m) } +func (*StatusResponse) ProtoMessage() {} +func (*StatusResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{5} +} +func (m *StatusResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StatusResponse.Unmarshal(m, b) +} +func (m *StatusResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StatusResponse.Marshal(b, m, deterministic) +} +func (m *StatusResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_StatusResponse.Merge(m, src) +} +func (m *StatusResponse) XXX_Size() int { + return xxx_messageInfo_StatusResponse.Size(m) +} +func (m *StatusResponse) XXX_DiscardUnknown() { + xxx_messageInfo_StatusResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_StatusResponse proto.InternalMessageInfo + +func (m *StatusResponse) GetStatus() int32 { + if m != nil { + return m.Status + } + return 0 +} + +type Response struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} +func (*Response) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{6} +} +func (m *Response) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Response.Unmarshal(m, b) +} +func (m *Response) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Response.Marshal(b, m, deterministic) +} +func (m *Response) XXX_Merge(src proto.Message) { + xxx_messageInfo_Response.Merge(m, src) +} +func (m *Response) XXX_Size() int { + return xxx_messageInfo_Response.Size(m) +} +func (m *Response) XXX_DiscardUnknown() { + xxx_messageInfo_Response.DiscardUnknown(m) +} + +var xxx_messageInfo_Response proto.InternalMessageInfo + +func init() { + proto.RegisterType((*StatusRequest)(nil), "fusemanager.StatusRequest") + proto.RegisterType((*InitRequest)(nil), "fusemanager.InitRequest") + proto.RegisterType((*MountRequest)(nil), "fusemanager.MountRequest") + proto.RegisterMapType((map[string]string)(nil), "fusemanager.MountRequest.LabelsEntry") + proto.RegisterType((*CheckRequest)(nil), "fusemanager.CheckRequest") + proto.RegisterMapType((map[string]string)(nil), "fusemanager.CheckRequest.LabelsEntry") + proto.RegisterType((*UnmountRequest)(nil), "fusemanager.UnmountRequest") + proto.RegisterType((*StatusResponse)(nil), "fusemanager.StatusResponse") + proto.RegisterType((*Response)(nil), "fusemanager.Response") +} + +func init() { proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) } + +var fileDescriptor_00212fb1f9d3bf1c = []byte{ + // 386 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x53, 0x51, 0x4b, 0xf3, 0x30, + 0x14, 0xa5, 0xdd, 0xd6, 0xef, 0xdb, 0xed, 0x9c, 0x12, 0x54, 0x6a, 0x05, 0x19, 0x05, 0xa1, 0x2f, + 0x6b, 0x65, 0x3e, 0xe8, 0x84, 0x3d, 0xa8, 0x28, 0x08, 0xee, 0xa5, 0xc3, 0x17, 0xdf, 0xb2, 0x92, + 0x75, 0x65, 0x6b, 0x52, 0x9b, 0x74, 0x30, 0x7f, 0x91, 0xff, 0xc5, 0x3f, 0x25, 0xcd, 0xba, 0x91, + 0x8a, 0x13, 0x84, 0xbd, 0xe5, 0x24, 0xf7, 0xdc, 0x9e, 0x7b, 0xcf, 0x29, 0x34, 0x71, 0x1a, 0x7b, + 0x69, 0xc6, 0x04, 0x43, 0xe6, 0x24, 0xe7, 0x24, 0xc1, 0x14, 0x47, 0x24, 0x73, 0xf6, 0x61, 0x6f, + 0x24, 0xb0, 0xc8, 0x79, 0x40, 0xde, 0x72, 0xc2, 0x85, 0xd3, 0x07, 0xf3, 0x89, 0xc6, 0xa2, 0x84, + 0x08, 0x41, 0x3d, 0x63, 0x4c, 0x58, 0x5a, 0x47, 0x73, 0x9b, 0x81, 0x3c, 0xa3, 0x63, 0x30, 0x42, + 0x46, 0x27, 0x71, 0x64, 0xe9, 0x1d, 0xcd, 0x6d, 0x05, 0x25, 0x72, 0x3e, 0x34, 0x68, 0x0d, 0x59, + 0x4e, 0x37, 0xe4, 0x33, 0x80, 0xa4, 0xc0, 0x29, 0x8b, 0xe9, 0xba, 0x85, 0x72, 0x83, 0x06, 0x60, + 0xcc, 0xf1, 0x98, 0xcc, 0xb9, 0xa5, 0x77, 0x6a, 0xae, 0xd9, 0x3b, 0xf7, 0x14, 0x69, 0x9e, 0xda, + 0xca, 0x7b, 0x96, 0x75, 0x0f, 0x54, 0x64, 0xcb, 0xa0, 0x24, 0xd9, 0x7d, 0x30, 0x95, 0x6b, 0x74, + 0x00, 0xb5, 0x19, 0x59, 0x96, 0x9f, 0x29, 0x8e, 0xe8, 0x10, 0x1a, 0x0b, 0x3c, 0xcf, 0x89, 0xd4, + 0xd9, 0x0c, 0x56, 0xe0, 0x46, 0xbf, 0xd6, 0xa4, 0xd4, 0xfb, 0x29, 0x09, 0x67, 0xbb, 0x91, 0xaa, + 0xb6, 0xda, 0xb5, 0xd4, 0x0b, 0x68, 0xbf, 0xd0, 0xe4, 0x0f, 0x6b, 0x75, 0x5c, 0x68, 0xaf, 0x3d, + 0xe5, 0x29, 0xa3, 0x9c, 0x14, 0x8e, 0x71, 0x79, 0x23, 0xab, 0x1b, 0x41, 0x89, 0x1c, 0x80, 0xff, + 0xeb, 0x9a, 0xde, 0xa7, 0x0e, 0xd6, 0x48, 0xe0, 0x2c, 0x7a, 0x7f, 0xcc, 0x39, 0x19, 0xae, 0x26, + 0x1b, 0x91, 0x6c, 0x11, 0x87, 0x04, 0xdd, 0x82, 0xb1, 0x6a, 0x89, 0xec, 0xca, 0xe0, 0x95, 0xec, + 0xd8, 0xa7, 0x3f, 0xbe, 0x95, 0x1a, 0xae, 0xa0, 0x5e, 0x04, 0x0b, 0x59, 0x95, 0x22, 0x25, 0x6b, + 0xf6, 0x51, 0xe5, 0x65, 0x43, 0xec, 0x43, 0x43, 0x46, 0x01, 0x9d, 0x6c, 0x8d, 0xc7, 0x2f, 0x54, + 0x69, 0xcd, 0x37, 0xaa, 0x6a, 0xd7, 0x36, 0xea, 0x00, 0xfe, 0x95, 0x6b, 0x47, 0xd5, 0xb1, 0xaa, + 0x66, 0x6c, 0xa1, 0xdf, 0xf9, 0xaf, 0xdd, 0x28, 0x16, 0xd3, 0x7c, 0xec, 0x85, 0x2c, 0xf1, 0xb9, + 0xdc, 0x6b, 0x97, 0x53, 0x9c, 0xf2, 0x29, 0x13, 0x82, 0x64, 0xbe, 0xc2, 0xf2, 0x71, 0x1a, 0x8f, + 0x0d, 0xf9, 0x73, 0x5e, 0x7e, 0x05, 0x00, 0x00, 0xff, 0xff, 0x9d, 0x24, 0xe1, 0x41, 0xa9, 0x03, + 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// StargzFuseManagerServiceClient is the client API for StargzFuseManagerService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type StargzFuseManagerServiceClient interface { + Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*Response, error) + Mount(ctx context.Context, in *MountRequest, opts ...grpc.CallOption) (*Response, error) + Check(ctx context.Context, in *CheckRequest, opts ...grpc.CallOption) (*Response, error) + Unmount(ctx context.Context, in *UnmountRequest, opts ...grpc.CallOption) (*Response, error) +} + +type stargzFuseManagerServiceClient struct { + cc *grpc.ClientConn +} + +func NewStargzFuseManagerServiceClient(cc *grpc.ClientConn) StargzFuseManagerServiceClient { + return &stargzFuseManagerServiceClient{cc} +} + +func (c *stargzFuseManagerServiceClient) Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) { + out := new(StatusResponse) + err := c.cc.Invoke(ctx, "/fusemanager.StargzFuseManagerService/Status", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *stargzFuseManagerServiceClient) Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/fusemanager.StargzFuseManagerService/Init", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *stargzFuseManagerServiceClient) Mount(ctx context.Context, in *MountRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/fusemanager.StargzFuseManagerService/Mount", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *stargzFuseManagerServiceClient) Check(ctx context.Context, in *CheckRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/fusemanager.StargzFuseManagerService/Check", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *stargzFuseManagerServiceClient) Unmount(ctx context.Context, in *UnmountRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/fusemanager.StargzFuseManagerService/Unmount", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// StargzFuseManagerServiceServer is the server API for StargzFuseManagerService service. +type StargzFuseManagerServiceServer interface { + Status(context.Context, *StatusRequest) (*StatusResponse, error) + Init(context.Context, *InitRequest) (*Response, error) + Mount(context.Context, *MountRequest) (*Response, error) + Check(context.Context, *CheckRequest) (*Response, error) + Unmount(context.Context, *UnmountRequest) (*Response, error) +} + +// UnimplementedStargzFuseManagerServiceServer can be embedded to have forward compatible implementations. +type UnimplementedStargzFuseManagerServiceServer struct { +} + +func (*UnimplementedStargzFuseManagerServiceServer) Status(ctx context.Context, req *StatusRequest) (*StatusResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Status not implemented") +} +func (*UnimplementedStargzFuseManagerServiceServer) Init(ctx context.Context, req *InitRequest) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method Init not implemented") +} +func (*UnimplementedStargzFuseManagerServiceServer) Mount(ctx context.Context, req *MountRequest) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method Mount not implemented") +} +func (*UnimplementedStargzFuseManagerServiceServer) Check(ctx context.Context, req *CheckRequest) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method Check not implemented") +} +func (*UnimplementedStargzFuseManagerServiceServer) Unmount(ctx context.Context, req *UnmountRequest) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method Unmount not implemented") +} + +func RegisterStargzFuseManagerServiceServer(s *grpc.Server, srv StargzFuseManagerServiceServer) { + s.RegisterService(&_StargzFuseManagerService_serviceDesc, srv) +} + +func _StargzFuseManagerService_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StatusRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StargzFuseManagerServiceServer).Status(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fusemanager.StargzFuseManagerService/Status", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StargzFuseManagerServiceServer).Status(ctx, req.(*StatusRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _StargzFuseManagerService_Init_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(InitRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StargzFuseManagerServiceServer).Init(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fusemanager.StargzFuseManagerService/Init", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StargzFuseManagerServiceServer).Init(ctx, req.(*InitRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _StargzFuseManagerService_Mount_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MountRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StargzFuseManagerServiceServer).Mount(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fusemanager.StargzFuseManagerService/Mount", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StargzFuseManagerServiceServer).Mount(ctx, req.(*MountRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _StargzFuseManagerService_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CheckRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StargzFuseManagerServiceServer).Check(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fusemanager.StargzFuseManagerService/Check", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StargzFuseManagerServiceServer).Check(ctx, req.(*CheckRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _StargzFuseManagerService_Unmount_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UnmountRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StargzFuseManagerServiceServer).Unmount(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fusemanager.StargzFuseManagerService/Unmount", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StargzFuseManagerServiceServer).Unmount(ctx, req.(*UnmountRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _StargzFuseManagerService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "fusemanager.StargzFuseManagerService", + HandlerType: (*StargzFuseManagerServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Status", + Handler: _StargzFuseManagerService_Status_Handler, + }, + { + MethodName: "Init", + Handler: _StargzFuseManagerService_Init_Handler, + }, + { + MethodName: "Mount", + Handler: _StargzFuseManagerService_Mount_Handler, + }, + { + MethodName: "Check", + Handler: _StargzFuseManagerService_Check_Handler, + }, + { + MethodName: "Unmount", + Handler: _StargzFuseManagerService_Unmount_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "api.proto", +} diff --git a/fusemanager/api/api.proto b/fusemanager/api/api.proto new file mode 100644 index 000000000..3e13ca67a --- /dev/null +++ b/fusemanager/api/api.proto @@ -0,0 +1,58 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +syntax = "proto3"; + +option go_package = "github.com/stargz-snapshotter/fusemanager/api"; + +package fusemanager; + +service StargzFuseManagerService { + rpc Status (StatusRequest) returns (StatusResponse); + rpc Init (InitRequest) returns (Response); + rpc Mount (MountRequest) returns (Response); + rpc Check (CheckRequest) returns (Response); + rpc Unmount (UnmountRequest) returns (Response); +} + +message StatusRequest { +} + +message InitRequest { + string root = 1; + bytes config = 2; +} + +message MountRequest { + string mountpoint = 1; + map labels = 2; +} + +message CheckRequest { + string mountpoint = 1; + map labels = 2; +} + +message UnmountRequest { + string mountpoint = 1; +} + +message StatusResponse { + int32 status = 1; +} + +message Response { +} \ No newline at end of file diff --git a/fusemanager/api/generate.go b/fusemanager/api/generate.go new file mode 100644 index 000000000..2f61d5eee --- /dev/null +++ b/fusemanager/api/generate.go @@ -0,0 +1,19 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package api + +//go:generate protoc --gogo_out=paths=source_relative,plugins=grpc:. api.proto diff --git a/fusemanager/client.go b/fusemanager/client.go new file mode 100644 index 000000000..6f29a9ef6 --- /dev/null +++ b/fusemanager/client.go @@ -0,0 +1,141 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package fusemanager + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/containerd/containerd/v2/defaults" + "github.com/containerd/containerd/v2/pkg/dialer" + "github.com/containerd/log" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials/insecure" + + pb "github.com/containerd/stargz-snapshotter/fusemanager/api" + "github.com/containerd/stargz-snapshotter/snapshot" +) + +type Client struct { + client pb.StargzFuseManagerServiceClient +} + +func NewManagerClient(ctx context.Context, root, socket string, config *Config) (snapshot.FileSystem, error) { + grpcCli, err := newClient(socket) + if err != nil { + return nil, err + } + + client := &Client{ + client: grpcCli, + } + + err = client.init(ctx, root, config) + if err != nil { + return nil, err + } + + return client, nil +} + +func newClient(socket string) (pb.StargzFuseManagerServiceClient, error) { + connParams := grpc.ConnectParams{ + Backoff: backoff.DefaultConfig, + } + gopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithConnectParams(connParams), + grpc.WithContextDialer(dialer.ContextDialer), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize), + grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize), + ), + } + + conn, err := grpc.NewClient(fmt.Sprintf("unix://%s", socket), gopts...) + if err != nil { + return nil, err + } + + return pb.NewStargzFuseManagerServiceClient(conn), nil +} + +func (cli *Client) init(ctx context.Context, root string, config *Config) error { + configBytes, err := json.Marshal(config) + if err != nil { + return err + } + + req := &pb.InitRequest{ + Root: root, + Config: configBytes, + } + + _, err = cli.client.Init(ctx, req) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to call Init") + return err + } + + return nil +} + +func (cli *Client) Mount(ctx context.Context, mountpoint string, labels map[string]string) error { + req := &pb.MountRequest{ + Mountpoint: mountpoint, + Labels: labels, + } + + _, err := cli.client.Mount(ctx, req) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to call Mount") + return err + } + + return nil +} + +func (cli *Client) Check(ctx context.Context, mountpoint string, labels map[string]string) error { + req := &pb.CheckRequest{ + Mountpoint: mountpoint, + Labels: labels, + } + + _, err := cli.client.Check(ctx, req) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to call Check") + return err + } + + return nil +} + +func (cli *Client) Unmount(ctx context.Context, mountpoint string) error { + req := &pb.UnmountRequest{ + Mountpoint: mountpoint, + } + + _, err := cli.client.Unmount(ctx, req) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to call Unmount") + return err + } + + return nil +} diff --git a/fusemanager/fusemanager.go b/fusemanager/fusemanager.go new file mode 100644 index 000000000..6d9e288fb --- /dev/null +++ b/fusemanager/fusemanager.go @@ -0,0 +1,259 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package fusemanager + +import ( + "context" + "flag" + "fmt" + golog "log" + "net" + "os" + "os/exec" + "os/signal" + "path/filepath" + "syscall" + + "github.com/containerd/log" + "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" + "google.golang.org/grpc" + + pb "github.com/containerd/stargz-snapshotter/fusemanager/api" + "github.com/containerd/stargz-snapshotter/version" +) + +var ( + debugFlag bool + versionFlag bool + fuseStoreAddr string + address string + logLevel string + logPath string + action string +) + +func parseFlags() { + flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs") + flag.BoolVar(&versionFlag, "v", false, "show the fusemanager version and exit") + flag.StringVar(&action, "action", "", "action of fusemanager") + flag.StringVar(&fuseStoreAddr, "fusestore-path", "/var/lib/containerd-stargz-grpc/fusestore.db", "address for the fusemanager's store") + flag.StringVar(&address, "address", "/run/containerd-stargz-grpc/fuse-manager.sock", "address for the fusemanager's gRPC socket") + flag.StringVar(&logLevel, "log-level", logrus.InfoLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]") + flag.StringVar(&logPath, "log-path", "", "path to fusemanager's logs, no log recorded if empty") + + flag.Parse() +} + +func Run() { + if err := run(); err != nil { + fmt.Fprintf(os.Stderr, "failed to run fusemanager: %v", err) + os.Exit(1) + } +} + +func run() error { + parseFlags() + if versionFlag { + fmt.Printf("%s:\n", os.Args[0]) + fmt.Println(" Version: ", version.Version) + fmt.Println(" Revision:", version.Revision) + fmt.Println("") + return nil + } + + if fuseStoreAddr == "" || address == "" { + return fmt.Errorf("fusemanager fusestore and socket path cannot be empty") + } + + ctx := log.WithLogger(context.Background(), log.L) + + switch action { + case "start": + return startNew(ctx, logPath, address, fuseStoreAddr, logLevel) + default: + return runFuseManager(ctx) + } +} + +func startNew(ctx context.Context, logPath, address, fusestore, logLevel string) error { + self, err := os.Executable() + if err != nil { + return err + } + + cwd, err := os.Getwd() + if err != nil { + return err + } + + args := []string{ + "-address", address, + "-fusestore-path", fusestore, + "-log-level", logLevel, + } + + // we use shim-like approach to start new fusemanager process by self-invoking in the background + // and detach it from parent + cmd := exec.CommandContext(ctx, self, args...) + cmd.Dir = cwd + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + + if logPath != "" { + err := os.Remove(logPath) + if err != nil && !os.IsNotExist(err) { + return err + } + file, err := os.Create(logPath) + if err != nil { + return err + } + cmd.Stdout = file + cmd.Stderr = file + } + + if err := cmd.Start(); err != nil { + return err + } + go cmd.Wait() + + if ready, err := waitUntilReady(ctx); err != nil || !ready { + if err != nil { + return fmt.Errorf("failed to start new fusemanager: %w", err) + } + if !ready { + return fmt.Errorf("failed to start new fusemanager, fusemanager not ready") + } + } + + return nil +} + +// waitUntilReady waits until fusemanager is ready to accept requests +func waitUntilReady(ctx context.Context) (bool, error) { + grpcCli, err := newClient(address) + if err != nil { + return false, err + } + + resp, err := grpcCli.Status(ctx, &pb.StatusRequest{}) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to call Status") + return false, err + } + + if resp.Status == FuseManagerNotReady { + return false, nil + } + + return true, nil +} + +func runFuseManager(ctx context.Context) error { + lvl, err := logrus.ParseLevel(logLevel) + if err != nil { + return fmt.Errorf("failed to prepare logger: %w", err) + } + + logrus.SetLevel(lvl) + logrus.SetFormatter(&logrus.JSONFormatter{ + TimestampFormat: log.RFC3339NanoFixed, + }) + + golog.SetOutput(log.G(ctx).WriterLevel(logrus.DebugLevel)) + + // Prepare the directory for the socket + if err := os.MkdirAll(filepath.Dir(address), 0700); err != nil { + return fmt.Errorf("failed to create directory %s: %w", filepath.Dir(address), err) + } + + // Try to remove the socket file to avoid EADDRINUSE + if err := os.Remove(address); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove old socket file: %w", err) + } + + l, err := net.Listen("unix", address) + if err != nil { + return fmt.Errorf("failed to listen socket: %w", err) + } + + server := grpc.NewServer() + fm, err := NewFuseManager(ctx, l, server, fuseStoreAddr) + if err != nil { + return fmt.Errorf("failed to configure manager server: %w", err) + } + + if err = fm.clearMounts(ctx); err != nil { + return fmt.Errorf("failed to clear mounts: %w", err) + } + + pb.RegisterStargzFuseManagerServiceServer(server, fm) + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, unix.SIGINT, unix.SIGTERM) + go func() { + sig := <-sigCh + log.G(ctx).Infof("Got %v", sig) + fm.server.Stop() + os.Remove(address) + }() + + if err = server.Serve(l); err != nil { + return fmt.Errorf("failed to serve fuse manager: %w", err) + } + + if err = fm.Close(ctx); err != nil { + return fmt.Errorf("failed to close fuse manager: %w", err) + } + + return nil +} + +func StartFuseManager(ctx context.Context, executable, address, fusestore, logLevel, logPath string) error { + // if socket exists, do not start it + if _, err := os.Stat(address); err == nil { + return nil + } else if !os.IsNotExist(err) { + return err + } + + if _, err := os.Stat(executable); err != nil { + log.G(ctx).WithError(err).Errorf("failed to stat fusemanager binary: %s", executable) + return err + } + + args := []string{ + "-action", "start", + "-address", address, + "-fusestore-path", fusestore, + "-log-level", logLevel, + "-log-path", logPath, + } + + cmd := exec.Command(executable, args...) + if err := cmd.Start(); err != nil { + return err + } + + if err := cmd.Wait(); err != nil { + return err + } + + return nil +} diff --git a/fusemanager/fusestore.go b/fusemanager/fusestore.go new file mode 100644 index 000000000..ebef19d88 --- /dev/null +++ b/fusemanager/fusestore.go @@ -0,0 +1,123 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package fusemanager + +import ( + "context" + "encoding/json" + + bolt "go.etcd.io/bbolt" + + "github.com/containerd/stargz-snapshotter/service" +) + +var ( + fuseInfoBucket = []byte("fuse-info-bucket") +) + +type fuseInfo struct { + Root string + Mountpoint string + Labels map[string]string + Config service.Config +} + +func (fm *Server) storeFuseInfo(fuseInfo *fuseInfo) error { + return fm.ms.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(fuseInfoBucket) + if err != nil { + return err + } + + key := []byte(fuseInfo.Mountpoint) + + val, err := json.Marshal(fuseInfo) + if err != nil { + return err + } + + err = bucket.Put(key, val) + if err != nil { + return err + } + + return nil + }) +} + +func (fm *Server) removeFuseInfo(fuseInfo *fuseInfo) error { + return fm.ms.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(fuseInfoBucket) + if err != nil { + return err + } + + key := []byte(fuseInfo.Mountpoint) + + err = bucket.Delete(key) + if err != nil { + return err + } + + return nil + }) +} + +// restoreFuseInfo restores fuseInfo when Init is called, it will skip mounted +// layers whose mountpoint can be found in fsMap +func (fm *Server) restoreFuseInfo(ctx context.Context) error { + return fm.ms.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(fuseInfoBucket) + if bucket == nil { + return nil + } + + return bucket.ForEach(func(_, v []byte) error { + mi := &fuseInfo{} + err := json.Unmarshal(v, mi) + if err != nil { + return err + } + + return fm.mount(ctx, mi.Mountpoint, mi.Labels) + }) + }) +} + +func (fm *Server) listMountpoints(ctx context.Context) ([]string, error) { + mountpoints := make([]string, 0) + err := fm.ms.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(fuseInfoBucket) + if bucket == nil { + return nil + } + return bucket.ForEach(func(_, v []byte) error { + mi := &fuseInfo{} + err := json.Unmarshal(v, mi) + if err != nil { + return err + } + mountpoints = append(mountpoints, mi.Mountpoint) + return nil + }) + }) + if err != nil { + return nil, err + } + + return mountpoints, nil +} diff --git a/fusemanager/service.go b/fusemanager/service.go new file mode 100644 index 000000000..186ceaea1 --- /dev/null +++ b/fusemanager/service.go @@ -0,0 +1,330 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package fusemanager + +import ( + "context" + "encoding/json" + "fmt" + "net" + "os" + "path/filepath" + "sync" + "syscall" + "time" + + "github.com/containerd/log" + "github.com/moby/sys/mountinfo" + bolt "go.etcd.io/bbolt" + "google.golang.org/grpc" + + pb "github.com/containerd/stargz-snapshotter/fusemanager/api" + "github.com/containerd/stargz-snapshotter/service" + "github.com/containerd/stargz-snapshotter/snapshot" +) + +const ( + FuseManagerNotReady = iota + FuseManagerWaitInit + FuseManagerReady +) + +type Config struct { + Config *service.Config + IPFS bool `toml:"ipfs"` + MetadataStore string `toml:"metadata_store" default:"memory"` + DefaultImageServiceAddress string `json:"default_image_service_address"` +} + +type ConfigContext struct { + Ctx context.Context + Config *Config + RootDir string + Server *grpc.Server +} + +var ( + configFuncs []ConfigFunc + configMu sync.Mutex +) + +type ConfigFunc func(cc *ConfigContext) ([]service.Option, error) + +func RegisterConfigFunc(f ConfigFunc) { + configMu.Lock() + defer configMu.Unlock() + configFuncs = append(configFuncs, f) +} + +type Server struct { + pb.UnimplementedStargzFuseManagerServiceServer + + lock sync.RWMutex + status int32 + + listener net.Listener + server *grpc.Server + + // root is the latest root passed from containerd-stargz-grpc + root string + // config is the latest config passed from containerd-stargz-grpc + config *Config + // fsMap maps mountpoint to its filesystem instance to ensure Mount/Check/Unmount + // call the proper filesystem + fsMap sync.Map + // curFs is filesystem created by latest config + curFs snapshot.FileSystem + ms *bolt.DB + + fuseStoreAddr string +} + +func NewFuseManager(ctx context.Context, listener net.Listener, server *grpc.Server, fuseStoreAddr string) (*Server, error) { + if err := os.MkdirAll(filepath.Dir(fuseStoreAddr), 0700); err != nil { + return nil, fmt.Errorf("failed to create directory %q: %w", filepath.Dir(fuseStoreAddr), err) + } + + db, err := bolt.Open(fuseStoreAddr, 0666, &bolt.Options{Timeout: 10 * time.Second, ReadOnly: false}) + if err != nil { + return nil, fmt.Errorf("failed to configure fusestore: %w", err) + } + + fm := &Server{ + status: FuseManagerWaitInit, + lock: sync.RWMutex{}, + fsMap: sync.Map{}, + ms: db, + listener: listener, + server: server, + fuseStoreAddr: fuseStoreAddr, + } + + return fm, nil +} + +func (fm *Server) Status(ctx context.Context, _ *pb.StatusRequest) (*pb.StatusResponse, error) { + fm.lock.RLock() + defer fm.lock.RUnlock() + + return &pb.StatusResponse{ + Status: fm.status, + }, nil +} + +func (fm *Server) Init(ctx context.Context, req *pb.InitRequest) (*pb.Response, error) { + fm.lock.Lock() + fm.status = FuseManagerWaitInit + defer func() { + fm.status = FuseManagerReady + fm.lock.Unlock() + }() + + ctx = log.WithLogger(ctx, log.G(ctx)) + + config := &Config{} + err := json.Unmarshal(req.Config, config) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to get config") + return &pb.Response{}, err + } + fm.root = req.Root + fm.config = config + + cc := &ConfigContext{ + Ctx: ctx, + Config: fm.config, + RootDir: fm.root, + Server: fm.server, + } + + var opts []service.Option + for _, configFunc := range configFuncs { + funcOpts, err := configFunc(cc) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to apply config function") + return &pb.Response{}, err + } + opts = append(opts, funcOpts...) + } + + fs, err := service.NewFileSystem(ctx, fm.root, fm.config.Config, opts...) + if err != nil { + return &pb.Response{}, err + } + fm.curFs = fs + + err = fm.restoreFuseInfo(ctx) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to restore fuse info") + return &pb.Response{}, err + } + + return &pb.Response{}, nil +} + +func (fm *Server) Mount(ctx context.Context, req *pb.MountRequest) (*pb.Response, error) { + fm.lock.RLock() + defer fm.lock.RUnlock() + if fm.status != FuseManagerReady { + return &pb.Response{}, fmt.Errorf("fuse manager not ready") + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("mountpoint", req.Mountpoint)) + + err := fm.mount(ctx, req.Mountpoint, req.Labels) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to mount stargz") + return &pb.Response{}, err + } + + fm.storeFuseInfo(&fuseInfo{ + Root: fm.root, + Mountpoint: req.Mountpoint, + Labels: req.Labels, + Config: *fm.config.Config, + }) + + return &pb.Response{}, nil +} + +func (fm *Server) Check(ctx context.Context, req *pb.CheckRequest) (*pb.Response, error) { + fm.lock.RLock() + defer fm.lock.RUnlock() + if fm.status != FuseManagerReady { + return &pb.Response{}, fmt.Errorf("fuse manager not ready") + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("mountpoint", req.Mountpoint)) + + obj, found := fm.fsMap.Load(req.Mountpoint) + if !found { + err := fmt.Errorf("failed to find filesystem of mountpoint %s", req.Mountpoint) + log.G(ctx).WithError(err).Errorf("failed to check filesystem") + return &pb.Response{}, err + } + + fs := obj.(snapshot.FileSystem) + err := fs.Check(ctx, req.Mountpoint, req.Labels) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to check filesystem") + return &pb.Response{}, err + } + + return &pb.Response{}, nil +} + +func (fm *Server) Unmount(ctx context.Context, req *pb.UnmountRequest) (*pb.Response, error) { + fm.lock.RLock() + defer fm.lock.RUnlock() + if fm.status != FuseManagerReady { + return &pb.Response{}, fmt.Errorf("fuse manager not ready") + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("mountpoint", req.Mountpoint)) + + obj, found := fm.fsMap.Load(req.Mountpoint) + if !found { + // check whether already unmounted + mounts, err := mountinfo.GetMounts(func(info *mountinfo.Info) (skip, stop bool) { + if info.Mountpoint == req.Mountpoint { + return false, true + } + return true, false + }) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to get mount info") + return &pb.Response{}, err + } + + if len(mounts) <= 0 { + return &pb.Response{}, nil + } + err = fmt.Errorf("failed to find filesystem of mountpoint %s", req.Mountpoint) + log.G(ctx).WithError(err).Errorf("failed to unmount filesystem") + return &pb.Response{}, err + } + + fs := obj.(snapshot.FileSystem) + err := fs.Unmount(ctx, req.Mountpoint) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to unmount filesystem") + return &pb.Response{}, err + } + + fm.fsMap.Delete(req.Mountpoint) + fm.removeFuseInfo(&fuseInfo{ + Mountpoint: req.Mountpoint, + }) + + return &pb.Response{}, nil +} + +func (fm *Server) Close(ctx context.Context) error { + fm.lock.Lock() + defer fm.lock.Unlock() + fm.status = FuseManagerNotReady + + err := fm.clearMounts(ctx) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to clear mounts") + return err + } + + err = fm.ms.Close() + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to close fusestore") + return err + } + + if err := os.Remove(fm.fuseStoreAddr); err != nil { + log.G(ctx).WithError(err).Errorf("failed to remove fusestore file %s", fm.fuseStoreAddr) + return err + } + + return nil +} + +func (fm *Server) mount(ctx context.Context, mountpoint string, labels map[string]string) error { + // mountpoint in fsMap means layer is already mounted, skip it + if _, found := fm.fsMap.Load(mountpoint); found { + return nil + } + + err := fm.curFs.Mount(ctx, mountpoint, labels) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to mount stargz") + return err + } + + fm.fsMap.Store(mountpoint, fm.curFs) + return nil +} + +func (fm *Server) clearMounts(ctx context.Context) error { + mountpoints, err := fm.listMountpoints(ctx) + if err != nil { + return err + } + + for _, mp := range mountpoints { + if err := syscall.Unmount(mp, syscall.MNT_FORCE); err != nil { + return err + } + } + + return nil +} diff --git a/go.mod b/go.mod index 3e6a9068c..532b50471 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,8 @@ require ( github.com/opencontainers/runtime-spec v1.2.0 github.com/prometheus/client_golang v1.20.5 github.com/rs/xid v1.6.0 + github.com/sirupsen/logrus v1.9.3 + go.etcd.io/bbolt v1.3.11 golang.org/x/sync v0.9.0 golang.org/x/sys v0.26.0 google.golang.org/grpc v1.68.0 @@ -90,7 +92,6 @@ require ( github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/testify v1.9.0 // indirect github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 // indirect @@ -98,7 +99,6 @@ require ( github.com/vbatts/tar-split v0.11.6 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect - go.etcd.io/bbolt v1.3.11 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect go.opentelemetry.io/otel v1.31.0 // indirect diff --git a/service/keychain/keychainconfig/keychainconfig.go b/service/keychain/keychainconfig/keychainconfig.go new file mode 100644 index 000000000..3cf973a1f --- /dev/null +++ b/service/keychain/keychainconfig/keychainconfig.go @@ -0,0 +1,88 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package keychainconfig + +import ( + "context" + "time" + + "github.com/containerd/containerd/v2/defaults" + "github.com/containerd/containerd/v2/pkg/dialer" + "github.com/containerd/stargz-snapshotter/service/keychain/cri" + "github.com/containerd/stargz-snapshotter/service/keychain/dockerconfig" + "github.com/containerd/stargz-snapshotter/service/keychain/kubeconfig" + "github.com/containerd/stargz-snapshotter/service/resolver" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials/insecure" + runtime "k8s.io/cri-api/pkg/apis/runtime/v1" +) + +type Config struct { + EnableKubeKeychain bool + EnableCRIKeychain bool + KubeconfigPath string + DefaultImageServiceAddress string + ImageServicePath string +} + +func ConfigKeychain(ctx context.Context, rpc *grpc.Server, config *Config) ([]resolver.Credential, error) { + credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)} + if config.EnableKubeKeychain { + var opts []kubeconfig.Option + if kcp := config.KubeconfigPath; kcp != "" { + opts = append(opts, kubeconfig.WithKubeconfigPath(kcp)) + } + credsFuncs = append(credsFuncs, kubeconfig.NewKubeconfigKeychain(ctx, opts...)) + } + if config.EnableCRIKeychain { + // connects to the backend CRI service (defaults to containerd socket) + criAddr := config.DefaultImageServiceAddress + if cp := config.ImageServicePath; cp != "" { + criAddr = cp + } + connectCRI := func() (runtime.ImageServiceClient, error) { + conn, err := newCRIConn(criAddr) + if err != nil { + return nil, err + } + return runtime.NewImageServiceClient(conn), nil + } + f, criServer := cri.NewCRIKeychain(ctx, connectCRI) + runtime.RegisterImageServiceServer(rpc, criServer) + credsFuncs = append(credsFuncs, f) + } + + return credsFuncs, nil +} + +func newCRIConn(criAddr string) (*grpc.ClientConn, error) { + // TODO: make gRPC options configurable from config.toml + backoffConfig := backoff.DefaultConfig + backoffConfig.MaxDelay = 3 * time.Second + connParams := grpc.ConnectParams{ + Backoff: backoffConfig, + } + gopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithConnectParams(connParams), + grpc.WithContextDialer(dialer.ContextDialer), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)), + } + return grpc.NewClient(dialer.DialAddress(criAddr), gopts...) +} diff --git a/service/service.go b/service/service.go index 28c49b9fe..fc1d71fcf 100644 --- a/service/service.go +++ b/service/service.go @@ -18,6 +18,7 @@ package service import ( "context" + "fmt" "path/filepath" "github.com/containerd/containerd/v2/core/snapshots" @@ -30,6 +31,7 @@ import ( "github.com/containerd/stargz-snapshotter/metadata" esgzexternaltoc "github.com/containerd/stargz-snapshotter/nativeconverter/estargz/externaltoc" "github.com/containerd/stargz-snapshotter/service/resolver" + "github.com/containerd/stargz-snapshotter/snapshot" snbase "github.com/containerd/stargz-snapshotter/snapshot" "github.com/hashicorp/go-multierror" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -66,6 +68,27 @@ func WithFilesystemOptions(opts ...stargzfs.Option) Option { // NewStargzSnapshotterService returns stargz snapshotter. func NewStargzSnapshotterService(ctx context.Context, root string, config *Config, opts ...Option) (snapshots.Snapshotter, error) { + fs, err := NewFileSystem(ctx, root, config, opts...) + if err != nil { + return nil, fmt.Errorf("failed to configure filesystem: %w", err) + } + + var snapshotter snapshots.Snapshotter + + snOpts := []snbase.Opt{snbase.AsynchronousRemove} + if config.SnapshotterConfig.AllowInvalidMountsOnRestart { + snOpts = append(snOpts, snbase.AllowInvalidMountsOnRestart) + } + + snapshotter, err = snbase.NewSnapshotter(ctx, snapshotterRoot(root), fs, snOpts...) + if err != nil { + return nil, fmt.Errorf("failed to create new snapshotter: %w", err) + } + + return snapshotter, nil +} + +func NewFileSystem(ctx context.Context, root string, config *Config, opts ...Option) (snapshot.FileSystem, error) { var sOpts options for _, o := range opts { o(&sOpts) @@ -97,22 +120,10 @@ func NewStargzSnapshotterService(ctx context.Context, root string, config *Confi ) fs, err := stargzfs.NewFilesystem(fsRoot(root), config.Config, fsOpts...) if err != nil { - log.G(ctx).WithError(err).Fatalf("failed to configure filesystem") - } - - var snapshotter snapshots.Snapshotter - - snOpts := []snbase.Opt{snbase.AsynchronousRemove} - if config.SnapshotterConfig.AllowInvalidMountsOnRestart { - snOpts = append(snOpts, snbase.AllowInvalidMountsOnRestart) - } - - snapshotter, err = snbase.NewSnapshotter(ctx, snapshotterRoot(root), fs, snOpts...) - if err != nil { - log.G(ctx).WithError(err).Fatalf("failed to create new snapshotter") + return nil, err } - return snapshotter, err + return fs, nil } func snapshotterRoot(root string) string {