Skip to content

Commit

Permalink
fusemanager: resolve review comments
Browse files Browse the repository at this point in the history
Signed-off-by: abushwang <[email protected]>
  • Loading branch information
wswsmao committed Dec 9, 2024
1 parent 5a4c3ff commit 7c333bf
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 46 deletions.
4 changes: 2 additions & 2 deletions cmd/containerd-stargz-grpc/fsopts/fsopts.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type Config struct {
}

const (
memoryMetadataType = "memory"
dbMetadataType = "db"
memoryMetadataType = "memory"
dbMetadataType = "db"
)

func ConfigFsOpts(ctx context.Context, rootDir string, config *Config) ([]fs.Option, error) {
Expand Down
19 changes: 7 additions & 12 deletions cmd/containerd-stargz-grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/pelletier/go-toml"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)

const (
Expand All @@ -55,10 +54,10 @@ const (
defaultLogLevel = log.InfoLevel
defaultRootDir = "/var/lib/containerd-stargz-grpc"
defaultImageServiceAddress = "/run/containerd/containerd.sock"
defaultFuseManagerAddress = "/run/containerd-stargz-grpc/fuse-namanger.sock"
defaultFuseManagerAddress = "/run/containerd-stargz-grpc/fuse-manager.sock"

fuseManagerBin = "stargz-fuse-manager"
fuseManagerAddress = "fuse-mananger.sock"
fuseManagerAddress = "fuse-manager.sock"
)

var (
Expand Down Expand Up @@ -148,15 +147,6 @@ func main() {
ImageServicePath: config.CRIKeychainConfig.ImageServicePath,
}

credsFuncs, criServer, err := keychainconfig.ConfigKeychain(ctx, &keyChainConfig)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure keychain")
}

if config.Config.CRIKeychainConfig.EnableKeychain {
runtime.RegisterImageServiceServer(rpc, criServer)
}

var rs snapshots.Snapshotter
if *detachFuseManager {
fmPath := config.FuseManagerPath
Expand Down Expand Up @@ -197,6 +187,11 @@ func main() {
}
log.G(ctx).Infof("Start snapshotter with fusemanager mode")
} else {
credsFuncs, err := keychainconfig.ConfigKeychain(ctx, rpc, &keyChainConfig)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure keychain")
}

fsConfig := fsopts.Config{
EnableIpfs: config.IPFS,
MetadataStore: config.MetadataStore,
Expand Down
18 changes: 9 additions & 9 deletions cmd/stargz-fuse-manager/fusemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,15 @@ import (

pb "github.com/containerd/stargz-snapshotter/cmd/stargz-fuse-manager/fusemanager/api"
"github.com/containerd/stargz-snapshotter/snapshot"
"google.golang.org/grpc/credentials/insecure"
)

type Client struct {
client pb.StargzFuseManagerServiceClient
}

func NewManagerClient(ctx context.Context, root, socket string, config *Config) (snapshot.FileSystem, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
grpcCli, err := newClient(timeoutCtx, socket)
grpcCli, err := newClient(socket)
if err != nil {
return nil, err
}
Expand All @@ -56,21 +55,22 @@ func NewManagerClient(ctx context.Context, root, socket string, config *Config)
return client, nil
}

func newClient(ctx context.Context, socket string) (pb.StargzFuseManagerServiceClient, error) {
func newClient(socket string) (pb.StargzFuseManagerServiceClient, error) {
backoffConfig := backoff.DefaultConfig
backoffConfig.MaxDelay = 3 * time.Second
connParams := grpc.ConnectParams{
Backoff: backoff.DefaultConfig,
Backoff: backoffConfig,
}

gopts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithInsecure(),
grpc.FailOnNonTempDialError(true),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithConnectParams(connParams),
grpc.WithContextDialer(dialer.ContextDialer),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
}

conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix://%s", socket), gopts...)
conn, err := grpc.NewClient(dialer.DialAddress(fmt.Sprintf("unix://%s", socket)), gopts...)
if err != nil {
return nil, err
}
Expand Down
11 changes: 4 additions & 7 deletions cmd/stargz-fuse-manager/fusemanager/fusemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"os/signal"
"path/filepath"
"syscall"
"time"

"github.com/containerd/log"
"github.com/pkg/errors"
Expand Down Expand Up @@ -135,7 +134,7 @@ func startNew(ctx context.Context, logPath, address, fusestore, logLevel string)
}
go cmd.Wait()

if ready, err := waitUntilReady(ctx, 10); err != nil || !ready {
if ready, err := waitUntilReady(ctx); err != nil || !ready {
if err != nil {
return errors.Wrapf(err, "failed to start new fusemanager")
}
Expand All @@ -147,11 +146,9 @@ func startNew(ctx context.Context, logPath, address, fusestore, logLevel string)
return nil
}

// waitUntilReady waits until fusemanager is ready to accept requests with timeout
func waitUntilReady(ctx context.Context, timeout int) (bool, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel()
grpcCli, err := newClient(timeoutCtx, address)
// waitUntilReady waits until fusemanager is ready to accept requests
func waitUntilReady(ctx context.Context) (bool, error) {
grpcCli, err := newClient(address)
if err != nil {
return false, err
}
Expand Down
20 changes: 10 additions & 10 deletions cmd/stargz-fuse-manager/fusemanager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ const (
)

type Config struct {
Config *service.Config
IPFS bool
MetadataStore string
Config *service.Config
IPFS bool
MetadataStore string
DefaultImageServiceAddress string
}

Expand Down Expand Up @@ -87,12 +87,12 @@ func NewFuseManager(ctx context.Context, listener net.Listener, server *grpc.Ser
}

fm := &Server{
status: FuseManagerWaitInit,
lock: sync.RWMutex{},
fsMap: sync.Map{},
ms: db,
listener: listener,
server: server,
status: FuseManagerWaitInit,
lock: sync.RWMutex{},
fsMap: sync.Map{},
ms: db,
listener: listener,
server: server,
fuseStoreAddr: fuseStoreAddr,
}

Expand Down Expand Up @@ -136,7 +136,7 @@ func (fm *Server) Init(ctx context.Context, req *pb.InitRequest) (*pb.Response,
ImageServicePath: config.Config.CRIKeychainConfig.ImageServicePath,
}

credsFuncs, _, err := keychainconfig.ConfigKeychain(ctx, &keyChainConfig)
credsFuncs, err := keychainconfig.ConfigKeychain(ctx, fm.server, &keyChainConfig)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure keychain")
}
Expand Down
2 changes: 1 addition & 1 deletion fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
logLevel = *fsOpts.metricsLogLevel
}
commonmetrics.Register(logLevel) // Register common metrics. This will happen only once.
metrics.Register(ns) // Register layer metrics.
metrics.Register(ns) // Register layer metrics.
}
if metricsCtr == nil {
metricsCtr = layermetrics.NewLayerMetrics(ns)
Expand Down
9 changes: 4 additions & 5 deletions service/keychain/keychainconfig/keychainconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ type Config struct {
ImageServicePath string
}

func ConfigKeychain(ctx context.Context, config *Config) ([]resolver.Credential, runtime.ImageServiceServer, error) {
func ConfigKeychain(ctx context.Context, rpc *grpc.Server, config *Config) ([]resolver.Credential, error) {
credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)}
var criServer runtime.ImageServiceServer
if config.EnableKubeKeychain {
var opts []kubeconfig.Option
if kcp := config.KubeconfigPath; kcp != "" {
Expand All @@ -63,12 +62,12 @@ func ConfigKeychain(ctx context.Context, config *Config) ([]resolver.Credential,
}
return runtime.NewImageServiceClient(conn), nil
}
f, server := cri.NewCRIKeychain(ctx, connectCRI)
f, criServer := cri.NewCRIKeychain(ctx, connectCRI)
runtime.RegisterImageServiceServer(rpc, criServer)
credsFuncs = append(credsFuncs, f)
criServer = server
}

return credsFuncs, criServer, nil
return credsFuncs, nil
}

func newCRIConn(criAddr string) (*grpc.ClientConn, error) {
Expand Down

0 comments on commit 7c333bf

Please sign in to comment.