Skip to content

Commit

Permalink
Add fuse-manager
Browse files Browse the repository at this point in the history
Signed-off-by: abushwang <[email protected]>
Co-authored-by: Zuti He <[email protected]>
  • Loading branch information
wswsmao and ilyee committed Dec 13, 2024
1 parent ff392c1 commit 0c51d92
Show file tree
Hide file tree
Showing 17 changed files with 1,863 additions and 124 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ REVISION=$(shell git rev-parse HEAD)$(shell if ! git diff --no-ext-diff --quiet
GO_BUILD_LDFLAGS ?= -s -w
GO_LD_FLAGS=-ldflags '$(GO_BUILD_LDFLAGS) -X $(PKG)/version.Version=$(VERSION) -X $(PKG)/version.Revision=$(REVISION) $(GO_EXTRA_LDFLAGS)'

CMD=containerd-stargz-grpc ctr-remote stargz-store
CMD=containerd-stargz-grpc ctr-remote stargz-store stargz-fuse-manager

CMD_BINARIES=$(addprefix $(PREFIX),$(CMD))

Expand All @@ -48,6 +48,9 @@ stargz-store: FORCE
stargz-store-helper: FORCE
cd cmd/ ; GO111MODULE=$(GO111MODULE_VALUE) go build -o $(PREFIX)$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) -v ./stargz-store/helper

stargz-fuse-manager: FORCE
cd cmd/ ; GO111MODULE=$(GO111MODULE_VALUE) go build -o $(PREFIX)$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) -v ./stargz-fuse-manager

check:
@echo "$@"
@GO111MODULE=$(GO111MODULE_VALUE) $(shell go env GOPATH)/bin/golangci-lint run
Expand Down
81 changes: 81 additions & 0 deletions cmd/containerd-stargz-grpc/fsopts/fsopts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fsopts

import (
"context"
"fmt"
"io"
"path/filepath"

"github.com/containerd/log"
dbmetadata "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/db"
ipfs "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/ipfs"
"github.com/containerd/stargz-snapshotter/fs"
"github.com/containerd/stargz-snapshotter/metadata"
memorymetadata "github.com/containerd/stargz-snapshotter/metadata/memory"
bolt "go.etcd.io/bbolt"
)

type Config struct {
EnableIpfs bool
MetadataStore string
}

const (
memoryMetadataType = "memory"
dbMetadataType = "db"
)

func ConfigFsOpts(ctx context.Context, rootDir string, config *Config) ([]fs.Option, error) {
fsOpts := []fs.Option{fs.WithMetricsLogLevel(log.InfoLevel)}

if config.EnableIpfs {
fsOpts = append(fsOpts, fs.WithResolveHandler("ipfs", new(ipfs.ResolveHandler)))
}

mt, err := getMetadataStore(rootDir, config)
if err != nil {
return nil, fmt.Errorf("failed to configure metadata store: %w", err)
}
fsOpts = append(fsOpts, fs.WithMetadataStore(mt))

return fsOpts, nil
}

func getMetadataStore(rootDir string, config *Config) (metadata.Store, error) {
switch config.MetadataStore {
case "", memoryMetadataType:
return memorymetadata.NewReader, nil
case dbMetadataType:
bOpts := bolt.Options{
NoFreelistSync: true,
InitialMmapSize: 64 * 1024 * 1024,
FreelistType: bolt.FreelistMapType,
}
db, err := bolt.Open(filepath.Join(rootDir, "metadata.db"), 0600, &bOpts)
if err != nil {
return nil, err
}
return func(sr *io.SectionReader, opts ...metadata.Option) (metadata.Reader, error) {
return dbmetadata.NewReader(db, sr, opts...)
}, nil
default:
return nil, fmt.Errorf("unknown metadata store type: %v; must be %v or %v",
config.MetadataStore, memoryMetadataType, dbMetadataType)
}
}
187 changes: 87 additions & 100 deletions cmd/containerd-stargz-grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,32 @@ import (
"context"
"flag"
"fmt"
"io"
golog "log"
"math/rand"
"net"
"net/http"
"os"
"os/exec"
"os/signal"
"path/filepath"
"time"

snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
"github.com/containerd/containerd/v2/contrib/snapshotservice"
"github.com/containerd/containerd/v2/core/snapshots"
"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/pkg/dialer"
"github.com/containerd/containerd/v2/pkg/sys"
"github.com/containerd/log"
dbmetadata "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/db"
ipfs "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/ipfs"
"github.com/containerd/stargz-snapshotter/fs"
"github.com/containerd/stargz-snapshotter/metadata"
memorymetadata "github.com/containerd/stargz-snapshotter/metadata/memory"
"github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/fsopts"
"github.com/containerd/stargz-snapshotter/fusemanager"
"github.com/containerd/stargz-snapshotter/service"
"github.com/containerd/stargz-snapshotter/service/keychain/cri"
"github.com/containerd/stargz-snapshotter/service/keychain/dockerconfig"
"github.com/containerd/stargz-snapshotter/service/keychain/kubeconfig"
"github.com/containerd/stargz-snapshotter/service/resolver"
"github.com/containerd/stargz-snapshotter/service/keychain/keychainconfig"
snbase "github.com/containerd/stargz-snapshotter/snapshot"
"github.com/containerd/stargz-snapshotter/version"
sddaemon "github.com/coreos/go-systemd/v22/daemon"
metrics "github.com/docker/go-metrics"
"github.com/pelletier/go-toml"
bolt "go.etcd.io/bbolt"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)

const (
Expand All @@ -65,14 +54,19 @@ const (
defaultLogLevel = log.InfoLevel
defaultRootDir = "/var/lib/containerd-stargz-grpc"
defaultImageServiceAddress = "/run/containerd/containerd.sock"
defaultFuseManagerAddress = "/run/containerd-stargz-grpc/fuse-manager.sock"

fuseManagerBin = "stargz-fuse-manager"
fuseManagerAddress = "fuse-manager.sock"
)

var (
address = flag.String("address", defaultAddress, "address for the snapshotter's GRPC server")
configPath = flag.String("config", defaultConfigPath, "path to the configuration file")
logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]")
rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter")
printVersion = flag.Bool("version", false, "print the version")
address = flag.String("address", defaultAddress, "address for the snapshotter's GRPC server")
configPath = flag.String("config", defaultConfigPath, "path to the configuration file")
logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]")
rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter")
detachFuseManager = flag.Bool("detach-fuse-manager", false, "whether detach fusemanager or not")
printVersion = flag.Bool("version", false, "print the version")
)

type snapshotterConfig struct {
Expand All @@ -92,6 +86,11 @@ type snapshotterConfig struct {

// MetadataStore is the type of the metadata store to use.
MetadataStore string `toml:"metadata_store" default:"memory"`
// FuseManagerAddress is address for the fusemanager's GRPC server
FuseManagerAddress string `toml:"fusemanager_address"`

// FuseManagerPath is path to the fusemanager's executable
FuseManagerPath string `toml:"fusemanager_path"`
}

func main() {
Expand Down Expand Up @@ -140,51 +139,84 @@ func main() {
}

// Configure keychain
credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)}
if config.Config.KubeconfigKeychainConfig.EnableKeychain {
var opts []kubeconfig.Option
if kcp := config.Config.KubeconfigKeychainConfig.KubeconfigPath; kcp != "" {
opts = append(opts, kubeconfig.WithKubeconfigPath(kcp))
}
credsFuncs = append(credsFuncs, kubeconfig.NewKubeconfigKeychain(ctx, opts...))
keyChainConfig := keychainconfig.Config{
EnableKubeKeychain: config.Config.KubeconfigKeychainConfig.EnableKeychain,
EnableCRIKeychain: config.Config.CRIKeychainConfig.EnableKeychain,
KubeconfigPath: config.Config.KubeconfigKeychainConfig.KubeconfigPath,
DefaultImageServiceAddress: defaultImageServiceAddress,
ImageServicePath: config.CRIKeychainConfig.ImageServicePath,
}
if config.Config.CRIKeychainConfig.EnableKeychain {
// connects to the backend CRI service (defaults to containerd socket)
criAddr := defaultImageServiceAddress
if cp := config.CRIKeychainConfig.ImageServicePath; cp != "" {
criAddr = cp
}
connectCRI := func() (runtime.ImageServiceClient, error) {
conn, err := newCRIConn(criAddr)

var rs snapshots.Snapshotter
if *detachFuseManager {
fmPath := config.FuseManagerPath
if fmPath == "" {
var err error
fmPath, err = exec.LookPath(fuseManagerBin)
if err != nil {
return nil, err
log.G(ctx).WithError(err).Fatalf("failed to find fusemanager bin")
}
return runtime.NewImageServiceClient(conn), nil
}
f, criServer := cri.NewCRIKeychain(ctx, connectCRI)
runtime.RegisterImageServiceServer(rpc, criServer)
credsFuncs = append(credsFuncs, f)
}
fsOpts := []fs.Option{fs.WithMetricsLogLevel(log.InfoLevel)}
if config.IPFS {
fsOpts = append(fsOpts, fs.WithResolveHandler("ipfs", new(ipfs.ResolveHandler)))
}
mt, err := getMetadataStore(*rootDir, config)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure metadata store")
}
fsOpts = append(fsOpts, fs.WithMetadataStore(mt))
rs, err := service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config,
service.WithCredsFuncs(credsFuncs...), service.WithFilesystemOptions(fsOpts...))
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter")
fmAddr := config.FuseManagerAddress
if fmAddr == "" {
fmAddr = defaultFuseManagerAddress
}

if !filepath.IsAbs(fmAddr) {
log.G(ctx).WithError(err).Fatalf("fuse manager address must be an absolute path: %s", fmAddr)
}
err := fusemanager.StartFuseManager(ctx, fmPath, fmAddr, filepath.Join(*rootDir, "fusestore.db"), *logLevel, filepath.Join(*rootDir, "stargz-fuse-manager.log"))
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to start fusemanager")
}

fuseManagerConfig := fusemanager.Config{
Config: &config.Config,
IPFS: config.IPFS,
MetadataStore: config.MetadataStore,
DefaultImageServiceAddress: defaultImageServiceAddress,
}

fs, err := fusemanager.NewManagerClient(ctx, *rootDir, fmAddr, &fuseManagerConfig)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure fusemanager")
}
rs, err = snbase.NewSnapshotter(ctx, filepath.Join(*rootDir, "snapshotter"), fs, snbase.AsynchronousRemove)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter")
}
log.G(ctx).Infof("Start snapshotter with fusemanager mode")
} else {
credsFuncs, err := keychainconfig.ConfigKeychain(ctx, rpc, &keyChainConfig)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure keychain")
}

fsConfig := fsopts.Config{
EnableIpfs: config.IPFS,
MetadataStore: config.MetadataStore,
}
fsOpts, err := fsopts.ConfigFsOpts(ctx, *rootDir, &fsConfig)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure fs config")
}

rs, err = service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config,
service.WithCredsFuncs(credsFuncs...), service.WithFilesystemOptions(fsOpts...))
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter")
}
}

cleanup, err := serve(ctx, rpc, *address, rs, config)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to serve snapshotter")
}

// TODO: In detach mode, rs is taken over by fusemanager,
// but client will send unmount request to fusemanager,
// and fusemanager need get mount info from local db to
// determine its behavior
if cleanup {
log.G(ctx).Debug("Closing the snapshotter")
rs.Close()
Expand Down Expand Up @@ -275,48 +307,3 @@ func serve(ctx context.Context, rpc *grpc.Server, addr string, rs snapshots.Snap
}
return false, nil
}

const (
memoryMetadataType = "memory"
dbMetadataType = "db"
)

func getMetadataStore(rootDir string, config snapshotterConfig) (metadata.Store, error) {
switch config.MetadataStore {
case "", memoryMetadataType:
return memorymetadata.NewReader, nil
case dbMetadataType:
bOpts := bolt.Options{
NoFreelistSync: true,
InitialMmapSize: 64 * 1024 * 1024,
FreelistType: bolt.FreelistMapType,
}
db, err := bolt.Open(filepath.Join(rootDir, "metadata.db"), 0600, &bOpts)
if err != nil {
return nil, err
}
return func(sr *io.SectionReader, opts ...metadata.Option) (metadata.Reader, error) {
return dbmetadata.NewReader(db, sr, opts...)
}, nil
default:
return nil, fmt.Errorf("unknown metadata store type: %v; must be %v or %v",
config.MetadataStore, memoryMetadataType, dbMetadataType)
}
}

func newCRIConn(criAddr string) (*grpc.ClientConn, error) {
// TODO: make gRPC options configurable from config.toml
backoffConfig := backoff.DefaultConfig
backoffConfig.MaxDelay = 3 * time.Second
connParams := grpc.ConnectParams{
Backoff: backoffConfig,
}
gopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithConnectParams(connParams),
grpc.WithContextDialer(dialer.ContextDialer),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
}
return grpc.NewClient(dialer.DialAddress(criAddr), gopts...)
}
2 changes: 1 addition & 1 deletion cmd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ require (
golang.org/x/sync v0.9.0
golang.org/x/sys v0.26.0
google.golang.org/grpc v1.68.0
k8s.io/cri-api v0.32.0-alpha.0
)

require (
Expand Down Expand Up @@ -141,6 +140,7 @@ require (
k8s.io/api v0.31.2 // indirect
k8s.io/apimachinery v0.31.2 // indirect
k8s.io/client-go v0.31.2 // indirect
k8s.io/cri-api v0.32.0-alpha.0 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
Expand Down
Loading

0 comments on commit 0c51d92

Please sign in to comment.