From 7c333bf196a8781ba95abd4fd38a49a3b7e81be2 Mon Sep 17 00:00:00 2001 From: abushwang Date: Mon, 9 Dec 2024 17:13:31 +0800 Subject: [PATCH] fusemanager: resolve review comments Signed-off-by: abushwang --- cmd/containerd-stargz-grpc/fsopts/fsopts.go | 4 ++-- cmd/containerd-stargz-grpc/main.go | 19 +++++++----------- cmd/stargz-fuse-manager/fusemanager/client.go | 18 ++++++++--------- .../fusemanager/fusemanager.go | 11 ++++------ .../fusemanager/service.go | 20 +++++++++---------- fs/fs.go | 2 +- .../keychain/keychainconfig/keychainconfig.go | 9 ++++----- 7 files changed, 37 insertions(+), 46 deletions(-) diff --git a/cmd/containerd-stargz-grpc/fsopts/fsopts.go b/cmd/containerd-stargz-grpc/fsopts/fsopts.go index e678f52bb..269ad3cec 100644 --- a/cmd/containerd-stargz-grpc/fsopts/fsopts.go +++ b/cmd/containerd-stargz-grpc/fsopts/fsopts.go @@ -37,8 +37,8 @@ type Config struct { } const ( - memoryMetadataType = "memory" - dbMetadataType = "db" + memoryMetadataType = "memory" + dbMetadataType = "db" ) func ConfigFsOpts(ctx context.Context, rootDir string, config *Config) ([]fs.Option, error) { diff --git a/cmd/containerd-stargz-grpc/main.go b/cmd/containerd-stargz-grpc/main.go index 596002730..21c572bb7 100644 --- a/cmd/containerd-stargz-grpc/main.go +++ b/cmd/containerd-stargz-grpc/main.go @@ -46,7 +46,6 @@ import ( "github.com/pelletier/go-toml" "golang.org/x/sys/unix" "google.golang.org/grpc" - runtime "k8s.io/cri-api/pkg/apis/runtime/v1" ) const ( @@ -55,10 +54,10 @@ const ( defaultLogLevel = log.InfoLevel defaultRootDir = "/var/lib/containerd-stargz-grpc" defaultImageServiceAddress = "/run/containerd/containerd.sock" - defaultFuseManagerAddress = "/run/containerd-stargz-grpc/fuse-namanger.sock" + defaultFuseManagerAddress = "/run/containerd-stargz-grpc/fuse-manager.sock" fuseManagerBin = "stargz-fuse-manager" - fuseManagerAddress = "fuse-mananger.sock" + fuseManagerAddress = "fuse-manager.sock" ) var ( @@ -148,15 +147,6 @@ func main() { ImageServicePath: config.CRIKeychainConfig.ImageServicePath, } - credsFuncs, criServer, err := keychainconfig.ConfigKeychain(ctx, &keyChainConfig) - if err != nil { - log.G(ctx).WithError(err).Fatalf("failed to configure keychain") - } - - if config.Config.CRIKeychainConfig.EnableKeychain { - runtime.RegisterImageServiceServer(rpc, criServer) - } - var rs snapshots.Snapshotter if *detachFuseManager { fmPath := config.FuseManagerPath @@ -197,6 +187,11 @@ func main() { } log.G(ctx).Infof("Start snapshotter with fusemanager mode") } else { + credsFuncs, err := keychainconfig.ConfigKeychain(ctx, rpc, &keyChainConfig) + if err != nil { + log.G(ctx).WithError(err).Fatalf("failed to configure keychain") + } + fsConfig := fsopts.Config{ EnableIpfs: config.IPFS, MetadataStore: config.MetadataStore, diff --git a/cmd/stargz-fuse-manager/fusemanager/client.go b/cmd/stargz-fuse-manager/fusemanager/client.go index 3a38dc203..143b15163 100644 --- a/cmd/stargz-fuse-manager/fusemanager/client.go +++ b/cmd/stargz-fuse-manager/fusemanager/client.go @@ -30,6 +30,7 @@ import ( pb "github.com/containerd/stargz-snapshotter/cmd/stargz-fuse-manager/fusemanager/api" "github.com/containerd/stargz-snapshotter/snapshot" + "google.golang.org/grpc/credentials/insecure" ) type Client struct { @@ -37,9 +38,7 @@ type Client struct { } func NewManagerClient(ctx context.Context, root, socket string, config *Config) (snapshot.FileSystem, error) { - timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - grpcCli, err := newClient(timeoutCtx, socket) + grpcCli, err := newClient(socket) if err != nil { return nil, err } @@ -56,21 +55,22 @@ func NewManagerClient(ctx context.Context, root, socket string, config *Config) return client, nil } -func newClient(ctx context.Context, socket string) (pb.StargzFuseManagerServiceClient, error) { +func newClient(socket string) (pb.StargzFuseManagerServiceClient, error) { + backoffConfig := backoff.DefaultConfig + backoffConfig.MaxDelay = 3 * time.Second connParams := grpc.ConnectParams{ - Backoff: backoff.DefaultConfig, + Backoff: backoffConfig, } + gopts := []grpc.DialOption{ - grpc.WithBlock(), - grpc.WithInsecure(), - grpc.FailOnNonTempDialError(true), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithConnectParams(connParams), grpc.WithContextDialer(dialer.ContextDialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)), } - conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix://%s", socket), gopts...) + conn, err := grpc.NewClient(dialer.DialAddress(fmt.Sprintf("unix://%s", socket)), gopts...) if err != nil { return nil, err } diff --git a/cmd/stargz-fuse-manager/fusemanager/fusemanager.go b/cmd/stargz-fuse-manager/fusemanager/fusemanager.go index 31a3542c6..dd58ace98 100644 --- a/cmd/stargz-fuse-manager/fusemanager/fusemanager.go +++ b/cmd/stargz-fuse-manager/fusemanager/fusemanager.go @@ -27,7 +27,6 @@ import ( "os/signal" "path/filepath" "syscall" - "time" "github.com/containerd/log" "github.com/pkg/errors" @@ -135,7 +134,7 @@ func startNew(ctx context.Context, logPath, address, fusestore, logLevel string) } go cmd.Wait() - if ready, err := waitUntilReady(ctx, 10); err != nil || !ready { + if ready, err := waitUntilReady(ctx); err != nil || !ready { if err != nil { return errors.Wrapf(err, "failed to start new fusemanager") } @@ -147,11 +146,9 @@ func startNew(ctx context.Context, logPath, address, fusestore, logLevel string) return nil } -// waitUntilReady waits until fusemanager is ready to accept requests with timeout -func waitUntilReady(ctx context.Context, timeout int) (bool, error) { - timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second) - defer cancel() - grpcCli, err := newClient(timeoutCtx, address) +// waitUntilReady waits until fusemanager is ready to accept requests +func waitUntilReady(ctx context.Context) (bool, error) { + grpcCli, err := newClient(address) if err != nil { return false, err } diff --git a/cmd/stargz-fuse-manager/fusemanager/service.go b/cmd/stargz-fuse-manager/fusemanager/service.go index 9c8fc145a..e2a1fb7a2 100644 --- a/cmd/stargz-fuse-manager/fusemanager/service.go +++ b/cmd/stargz-fuse-manager/fusemanager/service.go @@ -47,9 +47,9 @@ const ( ) type Config struct { - Config *service.Config - IPFS bool - MetadataStore string + Config *service.Config + IPFS bool + MetadataStore string DefaultImageServiceAddress string } @@ -87,12 +87,12 @@ func NewFuseManager(ctx context.Context, listener net.Listener, server *grpc.Ser } fm := &Server{ - status: FuseManagerWaitInit, - lock: sync.RWMutex{}, - fsMap: sync.Map{}, - ms: db, - listener: listener, - server: server, + status: FuseManagerWaitInit, + lock: sync.RWMutex{}, + fsMap: sync.Map{}, + ms: db, + listener: listener, + server: server, fuseStoreAddr: fuseStoreAddr, } @@ -136,7 +136,7 @@ func (fm *Server) Init(ctx context.Context, req *pb.InitRequest) (*pb.Response, ImageServicePath: config.Config.CRIKeychainConfig.ImageServicePath, } - credsFuncs, _, err := keychainconfig.ConfigKeychain(ctx, &keyChainConfig) + credsFuncs, err := keychainconfig.ConfigKeychain(ctx, fm.server, &keyChainConfig) if err != nil { log.G(ctx).WithError(err).Fatalf("failed to configure keychain") } diff --git a/fs/fs.go b/fs/fs.go index 35e7e20bc..4547ae32e 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -176,7 +176,7 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F logLevel = *fsOpts.metricsLogLevel } commonmetrics.Register(logLevel) // Register common metrics. This will happen only once. - metrics.Register(ns) // Register layer metrics. + metrics.Register(ns) // Register layer metrics. } if metricsCtr == nil { metricsCtr = layermetrics.NewLayerMetrics(ns) diff --git a/service/keychain/keychainconfig/keychainconfig.go b/service/keychain/keychainconfig/keychainconfig.go index c28c23983..3cf973a1f 100644 --- a/service/keychain/keychainconfig/keychainconfig.go +++ b/service/keychain/keychainconfig/keychainconfig.go @@ -40,9 +40,8 @@ type Config struct { ImageServicePath string } -func ConfigKeychain(ctx context.Context, config *Config) ([]resolver.Credential, runtime.ImageServiceServer, error) { +func ConfigKeychain(ctx context.Context, rpc *grpc.Server, config *Config) ([]resolver.Credential, error) { credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)} - var criServer runtime.ImageServiceServer if config.EnableKubeKeychain { var opts []kubeconfig.Option if kcp := config.KubeconfigPath; kcp != "" { @@ -63,12 +62,12 @@ func ConfigKeychain(ctx context.Context, config *Config) ([]resolver.Credential, } return runtime.NewImageServiceClient(conn), nil } - f, server := cri.NewCRIKeychain(ctx, connectCRI) + f, criServer := cri.NewCRIKeychain(ctx, connectCRI) + runtime.RegisterImageServiceServer(rpc, criServer) credsFuncs = append(credsFuncs, f) - criServer = server } - return credsFuncs, criServer, nil + return credsFuncs, nil } func newCRIConn(criAddr string) (*grpc.ClientConn, error) {