From 0cb3263d740ad790d84c2aee60aecc51b150c5c9 Mon Sep 17 00:00:00 2001 From: YaoZengzeng Date: Thu, 2 Aug 2018 16:59:04 +0800 Subject: [PATCH] bugfix: move function getListener() to pkg/net/listener.go Signed-off-by: YaoZengzeng --- apis/server/server.go | 75 +--------------------- cri/v1alpha1/cri_stream.go | 4 +- cri/v1alpha1/service/cri.go | 14 +---- cri/v1alpha2/cri_stream.go | 4 +- cri/v1alpha2/service/cri.go | 14 +---- main.go | 2 +- pkg/{net => netutils}/doc.go | 2 +- pkg/{net => netutils}/interface.go | 2 +- pkg/{net => netutils}/interface_test.go | 2 +- pkg/netutils/listener.go | 83 +++++++++++++++++++++++++ pkg/netutils/listener_test.go | 51 +++++++++++++++ test/z_cli_daemon_test.go | 6 +- 12 files changed, 151 insertions(+), 108 deletions(-) rename pkg/{net => netutils}/doc.go (92%) rename pkg/{net => netutils}/interface.go (99%) rename pkg/{net => netutils}/interface_test.go (99%) create mode 100644 pkg/netutils/listener.go create mode 100644 pkg/netutils/listener_test.go diff --git a/apis/server/server.go b/apis/server/server.go index 6389aa131..132d3a2b0 100644 --- a/apis/server/server.go +++ b/apis/server/server.go @@ -2,19 +2,16 @@ package server import ( "crypto/tls" - "fmt" "net" "net/http" - "os" "strings" "sync" - "syscall" "github.com/alibaba/pouch/apis/plugins" "github.com/alibaba/pouch/daemon/config" "github.com/alibaba/pouch/daemon/mgr" "github.com/alibaba/pouch/pkg/httputils" - "github.com/alibaba/pouch/pkg/user" + "github.com/alibaba/pouch/pkg/netutils" "github.com/sirupsen/logrus" ) @@ -60,7 +57,7 @@ func (s *Server) Start(readyCh chan bool) (err error) { } for _, one := range s.Config.Listen { - l, err := getListener(one, tlsConfig) + l, err := netutils.GetListener(one, tlsConfig) if err != nil { readyCh <- false return err @@ -100,71 +97,3 @@ func (s *Server) Stop() error { } return nil } - -func getListener(addr string, tlsConfig *tls.Config) (net.Listener, error) { - addrParts := strings.SplitN(addr, "://", 2) - if len(addrParts) != 2 { - return nil, fmt.Errorf("invalid listening address: %s", addr) - } - - switch addrParts[0] { - case "tcp": - l, err := net.Listen("tcp", addrParts[1]) - if err != nil { - return l, err - } - if tlsConfig != nil { - l = tls.NewListener(l, tlsConfig) - } - return l, err - case "unix": - return newUnixSocket(addrParts[1]) - - default: - return nil, fmt.Errorf("only unix socket or tcp address is support") - } -} - -func newUnixSocket(path string) (net.Listener, error) { - if err := syscall.Unlink(path); err != nil && !os.IsNotExist(err) { - return nil, err - } - oldmask := syscall.Umask(0777) - defer syscall.Umask(oldmask) - l, err := net.Listen("unix", path) - if err != nil { - return nil, err - } - - // chmod unix socket, make other group writable - if err := os.Chmod(path, 0660); err != nil { - l.Close() - return nil, fmt.Errorf("failed to chmod %s: %s", path, err) - } - - gid, err := user.ParseID(user.GroupFile, "pouch", func(line, str string, idInt int, idErr error) (uint32, bool) { - var ( - name, placeholder string - id int - ) - - user.ParseString(line, &name, &placeholder, &id) - if str == name { - return uint32(id), true - } - return 0, false - }) - if err != nil { - // ignore error when group pouch not exist, group pouch should to be - // created before pouchd started, it means code not create pouch group - logrus.Warnf("failed to find group pouch, cannot change unix socket %s to pouch group", path) - return l, nil - } - - // chown unix socket with group pouch - if err := os.Chown(path, 0, int(gid)); err != nil { - l.Close() - return nil, fmt.Errorf("failed to chown %s: %s", path, err) - } - return l, nil -} diff --git a/cri/v1alpha1/cri_stream.go b/cri/v1alpha1/cri_stream.go index 9a2d66cc5..9799f858a 100644 --- a/cri/v1alpha1/cri_stream.go +++ b/cri/v1alpha1/cri_stream.go @@ -6,12 +6,12 @@ import ( "github.com/alibaba/pouch/cri/stream" "github.com/alibaba/pouch/daemon/mgr" - pouchnet "github.com/alibaba/pouch/pkg/net" + "github.com/alibaba/pouch/pkg/netutils" ) func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) { if address == "" { - a, err := pouchnet.ChooseBindAddress(nil) + a, err := netutils.ChooseBindAddress(nil) if err != nil { return nil, fmt.Errorf("failed to get stream server address: %v", err) } diff --git a/cri/v1alpha1/service/cri.go b/cri/v1alpha1/service/cri.go index f5c327bc6..fb949fab2 100644 --- a/cri/v1alpha1/service/cri.go +++ b/cri/v1alpha1/service/cri.go @@ -1,12 +1,9 @@ package service import ( - "net" - "os" - "syscall" - cri "github.com/alibaba/pouch/cri/v1alpha1" "github.com/alibaba/pouch/daemon/config" + "github.com/alibaba/pouch/pkg/netutils" "google.golang.org/grpc" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -27,8 +24,6 @@ func NewService(cfg *config.Config, criMgr cri.CriMgr) (*Service, error) { criMgr: criMgr, } - // TODO: Prepare streaming server. - runtime.RegisterRuntimeServiceServer(s.server, s.criMgr) runtime.RegisterImageServiceServer(s.server, s.criMgr) @@ -37,12 +32,7 @@ func NewService(cfg *config.Config, criMgr cri.CriMgr) (*Service, error) { // Serve starts grpc server. func (s *Service) Serve() error { - // Unlink to cleanup the previous socket file. - if err := syscall.Unlink(s.config.CriConfig.Listen); err != nil && !os.IsNotExist(err) { - return err - } - - l, err := net.Listen("unix", s.config.CriConfig.Listen) + l, err := netutils.GetListener(s.config.CriConfig.Listen, nil) if err != nil { return err } diff --git a/cri/v1alpha2/cri_stream.go b/cri/v1alpha2/cri_stream.go index 45f47bc9e..cb1064fba 100644 --- a/cri/v1alpha2/cri_stream.go +++ b/cri/v1alpha2/cri_stream.go @@ -6,12 +6,12 @@ import ( "github.com/alibaba/pouch/cri/stream" "github.com/alibaba/pouch/daemon/mgr" - pouchnet "github.com/alibaba/pouch/pkg/net" + "github.com/alibaba/pouch/pkg/netutils" ) func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) { if address == "" { - a, err := pouchnet.ChooseBindAddress(nil) + a, err := netutils.ChooseBindAddress(nil) if err != nil { return nil, fmt.Errorf("failed to get stream server address: %v", err) } diff --git a/cri/v1alpha2/service/cri.go b/cri/v1alpha2/service/cri.go index 8104f4bd0..2a0367f45 100644 --- a/cri/v1alpha2/service/cri.go +++ b/cri/v1alpha2/service/cri.go @@ -1,13 +1,10 @@ package service import ( - "net" - "os" - "syscall" - runtime "github.com/alibaba/pouch/cri/apis/v1alpha2" cri "github.com/alibaba/pouch/cri/v1alpha2" "github.com/alibaba/pouch/daemon/config" + "github.com/alibaba/pouch/pkg/netutils" "google.golang.org/grpc" ) @@ -27,8 +24,6 @@ func NewService(cfg *config.Config, criMgr cri.CriMgr) (*Service, error) { criMgr: criMgr, } - // TODO: Prepare streaming server. - runtime.RegisterRuntimeServiceServer(s.server, s.criMgr) runtime.RegisterImageServiceServer(s.server, s.criMgr) @@ -37,12 +32,7 @@ func NewService(cfg *config.Config, criMgr cri.CriMgr) (*Service, error) { // Serve starts grpc server. func (s *Service) Serve() error { - // Unlink to cleanup the previous socket file. - if err := syscall.Unlink(s.config.CriConfig.Listen); err != nil && !os.IsNotExist(err) { - return err - } - - l, err := net.Listen("unix", s.config.CriConfig.Listen) + l, err := netutils.GetListener(s.config.CriConfig.Listen, nil) if err != nil { return err } diff --git a/main.go b/main.go index d77d29faf..28eadd0dd 100644 --- a/main.go +++ b/main.go @@ -72,7 +72,7 @@ func setupFlags(cmd *cobra.Command) { flagSet.StringArrayVarP(&cfg.Listen, "listen", "l", []string{"unix:///var/run/pouchd.sock"}, "Specify listening addresses of Pouchd") flagSet.BoolVar(&cfg.IsCriEnabled, "enable-cri", false, "Specify whether enable the cri part of pouchd which is used to support Kubernetes") flagSet.StringVar(&cfg.CriConfig.CriVersion, "cri-version", "v1alpha2", "Specify the version of cri which is used to support Kubernetes") - flagSet.StringVar(&cfg.CriConfig.Listen, "listen-cri", "/var/run/pouchcri.sock", "Specify listening address of CRI") + flagSet.StringVar(&cfg.CriConfig.Listen, "listen-cri", "unix:///var/run/pouchcri.sock", "Specify listening address of CRI") flagSet.StringVar(&cfg.CriConfig.NetworkPluginBinDir, "cni-bin-dir", "/opt/cni/bin", "The directory for putting cni plugin binaries.") flagSet.StringVar(&cfg.CriConfig.NetworkPluginConfDir, "cni-conf-dir", "/etc/cni/net.d", "The directory for putting cni plugin configuration files.") flagSet.StringVar(&cfg.CriConfig.SandboxImage, "sandbox-image", "registry.cn-hangzhou.aliyuncs.com/google-containers/pause-amd64:3.0", "The image used by sandbox container.") diff --git a/pkg/net/doc.go b/pkg/netutils/doc.go similarity index 92% rename from pkg/net/doc.go rename to pkg/netutils/doc.go index ce749d570..5869bd499 100644 --- a/pkg/net/doc.go +++ b/pkg/netutils/doc.go @@ -1,4 +1,4 @@ -package net +package netutils // NOTE: The code in this package is directly copy from package "k8s.io/apimachinery/pkg/util/net". // We do this because we don't want to vendor too many irrelevant packages and try to make diff --git a/pkg/net/interface.go b/pkg/netutils/interface.go similarity index 99% rename from pkg/net/interface.go rename to pkg/netutils/interface.go index 46f2165cc..4d27ba80c 100644 --- a/pkg/net/interface.go +++ b/pkg/netutils/interface.go @@ -1,4 +1,4 @@ -package net +package netutils import ( "bufio" diff --git a/pkg/net/interface_test.go b/pkg/netutils/interface_test.go similarity index 99% rename from pkg/net/interface_test.go rename to pkg/netutils/interface_test.go index 7bd11e612..589781e53 100644 --- a/pkg/net/interface_test.go +++ b/pkg/netutils/interface_test.go @@ -1,4 +1,4 @@ -package net +package netutils import ( "fmt" diff --git a/pkg/netutils/listener.go b/pkg/netutils/listener.go new file mode 100644 index 000000000..51a288b89 --- /dev/null +++ b/pkg/netutils/listener.go @@ -0,0 +1,83 @@ +package netutils + +import ( + "crypto/tls" + "fmt" + "net" + "os" + "strings" + "syscall" + + "github.com/alibaba/pouch/pkg/user" + + "github.com/sirupsen/logrus" +) + +// GetListener get a listener for an address. +func GetListener(addr string, tlsConfig *tls.Config) (net.Listener, error) { + addrParts := strings.SplitN(addr, "://", 2) + if len(addrParts) != 2 { + return nil, fmt.Errorf("invalid listening address %s: must be in format [protocol]://[address]", addr) + } + + switch addrParts[0] { + case "tcp": + l, err := net.Listen("tcp", addrParts[1]) + if err != nil { + return l, err + } + if tlsConfig != nil { + l = tls.NewListener(l, tlsConfig) + } + return l, err + case "unix": + return newUnixSocket(addrParts[1]) + + default: + return nil, fmt.Errorf("only unix socket or tcp address is support") + } +} + +func newUnixSocket(path string) (net.Listener, error) { + if err := syscall.Unlink(path); err != nil && !os.IsNotExist(err) { + return nil, err + } + oldmask := syscall.Umask(0777) + defer syscall.Umask(oldmask) + l, err := net.Listen("unix", path) + if err != nil { + return nil, err + } + + // chmod unix socket, make other group writable + if err := os.Chmod(path, 0660); err != nil { + l.Close() + return nil, fmt.Errorf("failed to chmod %s: %s", path, err) + } + + gid, err := user.ParseID(user.GroupFile, "pouch", func(line, str string, idInt int, idErr error) (uint32, bool) { + var ( + name, placeholder string + id int + ) + + user.ParseString(line, &name, &placeholder, &id) + if str == name { + return uint32(id), true + } + return 0, false + }) + if err != nil { + // ignore error when group pouch not exist, group pouch should to be + // created before pouchd started, it means code not create pouch group + logrus.Warnf("failed to find group pouch, cannot change unix socket %s to pouch group", path) + return l, nil + } + + // chown unix socket with group pouch + if err := os.Chown(path, 0, int(gid)); err != nil { + l.Close() + return nil, fmt.Errorf("failed to chown %s: %s", path, err) + } + return l, nil +} diff --git a/pkg/netutils/listener_test.go b/pkg/netutils/listener_test.go new file mode 100644 index 000000000..0aadd47d9 --- /dev/null +++ b/pkg/netutils/listener_test.go @@ -0,0 +1,51 @@ +package netutils + +import ( + "fmt" + "reflect" + "testing" +) + +func TestGetListenerBasic(t *testing.T) { + type args struct { + addr string + } + tests := []struct { + name string + args args + wantErr error + }{ + { + "tcpAddressTest", + args{"tcp://127.0.0.1:12345"}, + nil, + }, + { + "unixAddressTest", + args{"unix:///tmp/pouchtest.sock"}, + nil, + }, + { + "otherProtocolTest", + args{"udp://127.0.0.1:12345"}, + fmt.Errorf("only unix socket or tcp address is support"), + }, + { + "invalidAddressTest", + args{"invalid address"}, + fmt.Errorf("invalid listening address invalid address: must be in format [protocol]://[address]"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l, err := GetListener(tt.args.addr, nil) + if !reflect.DeepEqual(err, tt.wantErr) { + t.Errorf("GetListener() return error %v, want %v", err, tt.wantErr) + } + if err == nil { + l.Close() + } + }) + } +} diff --git a/test/z_cli_daemon_test.go b/test/z_cli_daemon_test.go index 1bca52b6e..b0c42c196 100644 --- a/test/z_cli_daemon_test.go +++ b/test/z_cli_daemon_test.go @@ -389,14 +389,14 @@ func (suite *PouchDaemonSuite) TestDaemonDefaultRegistry(c *check.C) { defer dcfg.KillDaemon() } -// TestDaemonCriEnbaled tests enabling cri part in pouchd. -func (suite *PouchDaemonSuite) TestDaemonCriEnbaled(c *check.C) { +// TestDaemonCriEnabled tests enabling cri part in pouchd. +func (suite *PouchDaemonSuite) TestDaemonCriEnabled(c *check.C) { dcfg, err := StartDefaultDaemonDebug( "--enable-cri") c.Assert(err, check.IsNil) result := RunWithSpecifiedDaemon(dcfg, "info") - err = util.PartialEqual(result.Combined(), "CriEnabled: true") + err = util.PartialEqual(result.Combined(), "CriEnabled: true") c.Assert(err, check.IsNil) defer dcfg.KillDaemon()