diff --git a/cli/factory.go b/cli/factory.go index cab0680106..da1793dd7b 100644 --- a/cli/factory.go +++ b/cli/factory.go @@ -6,13 +6,23 @@ package main import ( - "errors" + "context" "fmt" + "net" + "os" + "os/signal" + "path/filepath" + "syscall" + "github.com/gogo/protobuf/types" + pb "github.com/kata-containers/runtime/protocols/cache" vc "github.com/kata-containers/runtime/virtcontainers" vf "github.com/kata-containers/runtime/virtcontainers/factory" "github.com/kata-containers/runtime/virtcontainers/pkg/oci" + "github.com/pkg/errors" "github.com/urfave/cli" + "golang.org/x/sys/unix" + "google.golang.org/grpc" ) var factorySubCmds = []cli.Command{ @@ -30,6 +40,84 @@ var factoryCLICommand = cli.Command{ }, } +type cacheServer struct { + rpc *grpc.Server + factory vc.Factory +} + +var jsonVMConfig *pb.GrpcVMConfig + +// Config requests base factory config and convert it to gRPC protocol. +func (s *cacheServer) Config(ctx context.Context, empty *types.Empty) (*pb.GrpcVMConfig, error) { + if jsonVMConfig == nil { + config := s.factory.Config() + + var err error + jsonVMConfig, err = config.ToGrpc() + if err != nil { + return nil, err + } + } + + return jsonVMConfig, nil +} + +// GetBaseVM requests a paused VM and convert it to gRPC protocol. +func (s *cacheServer) GetBaseVM(ctx context.Context, empty *types.Empty) (*pb.GrpcVM, error) { + config := s.factory.Config() + + vm, err := s.factory.GetBaseVM(ctx, config) + if err != nil { + return nil, errors.Wrapf(err, "failed to GetBaseVM") + } + + return vm.ToGrpc(config) +} + +func getUnixListener(path string) (net.Listener, error) { + err := os.MkdirAll(filepath.Dir(path), 0755) + if err != nil { + return nil, err + } + if err = unix.Unlink(path); err != nil && !os.IsNotExist(err) { + return nil, err + } + l, err := net.Listen("unix", path) + if err != nil { + return nil, err + } + if err = os.Chmod(path, 0600); err != nil { + l.Close() + return nil, err + } + return l, nil +} + +var handledSignals = []os.Signal{ + syscall.SIGTERM, + syscall.SIGINT, + syscall.SIGPIPE, +} + +func handleSignals(s *cacheServer, signals chan os.Signal) chan struct{} { + done := make(chan struct{}, 1) + go func() { + for { + sig := <-signals + kataLog.WithField("signal", sig).Debug("received signal") + switch sig { + case unix.SIGPIPE: + continue + default: + s.rpc.GracefulStop() + close(done) + return + } + } + }() + return done +} + var initFactoryCommand = cli.Command{ Name: "init", Usage: "initialize a VM factory based on kata-runtime configuration", @@ -44,6 +132,51 @@ var initFactoryCommand = cli.Command{ return errors.New("invalid runtime config") } + if runtimeConfig.FactoryConfig.VMCacheNumber > 0 { + factoryConfig := vf.Config{ + Template: runtimeConfig.FactoryConfig.Template, + Cache: runtimeConfig.FactoryConfig.VMCacheNumber, + VMCache: true, + VMConfig: vc.VMConfig{ + HypervisorType: runtimeConfig.HypervisorType, + HypervisorConfig: runtimeConfig.HypervisorConfig, + AgentType: runtimeConfig.AgentType, + AgentConfig: runtimeConfig.AgentConfig, + ProxyType: runtimeConfig.ProxyType, + ProxyConfig: runtimeConfig.ProxyConfig, + }, + } + f, err := vf.NewFactory(ctx, factoryConfig, false) + if err != nil { + return err + } + defer f.CloseFactory(ctx) + + s := &cacheServer{ + rpc: grpc.NewServer(), + factory: f, + } + pb.RegisterCacheServiceServer(s.rpc, s) + + l, err := getUnixListener(runtimeConfig.FactoryConfig.VMCacheEndpoint) + if err != nil { + return err + } + defer l.Close() + + signals := make(chan os.Signal, 8) + done := handleSignals(s, signals) + signal.Notify(signals, handledSignals...) + + kataLog.WithField("endpoint", runtimeConfig.FactoryConfig.VMCacheEndpoint).Info("VM cache server start") + s.rpc.Serve(l) + + <-done + + kataLog.WithField("endpoint", runtimeConfig.FactoryConfig.VMCacheEndpoint).Info("VM cache server stop") + return nil + } + if runtimeConfig.FactoryConfig.Template { factoryConfig := vf.Config{ Template: true,