Skip to content

Commit

Permalink
bugfix: move function getListener() to pkg/net/listener.go
Browse files Browse the repository at this point in the history
Signed-off-by: YaoZengzeng <[email protected]>
  • Loading branch information
YaoZengzeng committed Aug 3, 2018
1 parent 04b5951 commit 0cb3263
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 108 deletions.
75 changes: 2 additions & 73 deletions apis/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@ package server

import (
"crypto/tls"
"fmt"
"net"
"net/http"
"os"
"strings"
"sync"
"syscall"

"github.com/alibaba/pouch/apis/plugins"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/httputils"
"github.com/alibaba/pouch/pkg/user"
"github.com/alibaba/pouch/pkg/netutils"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -60,7 +57,7 @@ func (s *Server) Start(readyCh chan bool) (err error) {
}

for _, one := range s.Config.Listen {
l, err := getListener(one, tlsConfig)
l, err := netutils.GetListener(one, tlsConfig)
if err != nil {
readyCh <- false
return err
Expand Down Expand Up @@ -100,71 +97,3 @@ func (s *Server) Stop() error {
}
return nil
}

func getListener(addr string, tlsConfig *tls.Config) (net.Listener, error) {
addrParts := strings.SplitN(addr, "://", 2)
if len(addrParts) != 2 {
return nil, fmt.Errorf("invalid listening address: %s", addr)
}

switch addrParts[0] {
case "tcp":
l, err := net.Listen("tcp", addrParts[1])
if err != nil {
return l, err
}
if tlsConfig != nil {
l = tls.NewListener(l, tlsConfig)
}
return l, err
case "unix":
return newUnixSocket(addrParts[1])

default:
return nil, fmt.Errorf("only unix socket or tcp address is support")
}
}

func newUnixSocket(path string) (net.Listener, error) {
if err := syscall.Unlink(path); err != nil && !os.IsNotExist(err) {
return nil, err
}
oldmask := syscall.Umask(0777)
defer syscall.Umask(oldmask)
l, err := net.Listen("unix", path)
if err != nil {
return nil, err
}

// chmod unix socket, make other group writable
if err := os.Chmod(path, 0660); err != nil {
l.Close()
return nil, fmt.Errorf("failed to chmod %s: %s", path, err)
}

gid, err := user.ParseID(user.GroupFile, "pouch", func(line, str string, idInt int, idErr error) (uint32, bool) {
var (
name, placeholder string
id int
)

user.ParseString(line, &name, &placeholder, &id)
if str == name {
return uint32(id), true
}
return 0, false
})
if err != nil {
// ignore error when group pouch not exist, group pouch should to be
// created before pouchd started, it means code not create pouch group
logrus.Warnf("failed to find group pouch, cannot change unix socket %s to pouch group", path)
return l, nil
}

// chown unix socket with group pouch
if err := os.Chown(path, 0, int(gid)); err != nil {
l.Close()
return nil, fmt.Errorf("failed to chown %s: %s", path, err)
}
return l, nil
}
4 changes: 2 additions & 2 deletions cri/v1alpha1/cri_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (

"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/mgr"
pouchnet "github.com/alibaba/pouch/pkg/net"
"github.com/alibaba/pouch/pkg/netutils"
)

func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) {
if address == "" {
a, err := pouchnet.ChooseBindAddress(nil)
a, err := netutils.ChooseBindAddress(nil)
if err != nil {
return nil, fmt.Errorf("failed to get stream server address: %v", err)
}
Expand Down
14 changes: 2 additions & 12 deletions cri/v1alpha1/service/cri.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package service

import (
"net"
"os"
"syscall"

cri "github.com/alibaba/pouch/cri/v1alpha1"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/pkg/netutils"

"google.golang.org/grpc"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
Expand All @@ -27,8 +24,6 @@ func NewService(cfg *config.Config, criMgr cri.CriMgr) (*Service, error) {
criMgr: criMgr,
}

// TODO: Prepare streaming server.

runtime.RegisterRuntimeServiceServer(s.server, s.criMgr)
runtime.RegisterImageServiceServer(s.server, s.criMgr)

Expand All @@ -37,12 +32,7 @@ func NewService(cfg *config.Config, criMgr cri.CriMgr) (*Service, error) {

// Serve starts grpc server.
func (s *Service) Serve() error {
// Unlink to cleanup the previous socket file.
if err := syscall.Unlink(s.config.CriConfig.Listen); err != nil && !os.IsNotExist(err) {
return err
}

l, err := net.Listen("unix", s.config.CriConfig.Listen)
l, err := netutils.GetListener(s.config.CriConfig.Listen, nil)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cri/v1alpha2/cri_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (

"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/mgr"
pouchnet "github.com/alibaba/pouch/pkg/net"
"github.com/alibaba/pouch/pkg/netutils"
)

func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) {
if address == "" {
a, err := pouchnet.ChooseBindAddress(nil)
a, err := netutils.ChooseBindAddress(nil)
if err != nil {
return nil, fmt.Errorf("failed to get stream server address: %v", err)
}
Expand Down
14 changes: 2 additions & 12 deletions cri/v1alpha2/service/cri.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package service

import (
"net"
"os"
"syscall"

runtime "github.com/alibaba/pouch/cri/apis/v1alpha2"
cri "github.com/alibaba/pouch/cri/v1alpha2"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/pkg/netutils"

"google.golang.org/grpc"
)
Expand All @@ -27,8 +24,6 @@ func NewService(cfg *config.Config, criMgr cri.CriMgr) (*Service, error) {
criMgr: criMgr,
}

// TODO: Prepare streaming server.

runtime.RegisterRuntimeServiceServer(s.server, s.criMgr)
runtime.RegisterImageServiceServer(s.server, s.criMgr)

Expand All @@ -37,12 +32,7 @@ func NewService(cfg *config.Config, criMgr cri.CriMgr) (*Service, error) {

// Serve starts grpc server.
func (s *Service) Serve() error {
// Unlink to cleanup the previous socket file.
if err := syscall.Unlink(s.config.CriConfig.Listen); err != nil && !os.IsNotExist(err) {
return err
}

l, err := net.Listen("unix", s.config.CriConfig.Listen)
l, err := netutils.GetListener(s.config.CriConfig.Listen, nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func setupFlags(cmd *cobra.Command) {
flagSet.StringArrayVarP(&cfg.Listen, "listen", "l", []string{"unix:///var/run/pouchd.sock"}, "Specify listening addresses of Pouchd")
flagSet.BoolVar(&cfg.IsCriEnabled, "enable-cri", false, "Specify whether enable the cri part of pouchd which is used to support Kubernetes")
flagSet.StringVar(&cfg.CriConfig.CriVersion, "cri-version", "v1alpha2", "Specify the version of cri which is used to support Kubernetes")
flagSet.StringVar(&cfg.CriConfig.Listen, "listen-cri", "/var/run/pouchcri.sock", "Specify listening address of CRI")
flagSet.StringVar(&cfg.CriConfig.Listen, "listen-cri", "unix:///var/run/pouchcri.sock", "Specify listening address of CRI")
flagSet.StringVar(&cfg.CriConfig.NetworkPluginBinDir, "cni-bin-dir", "/opt/cni/bin", "The directory for putting cni plugin binaries.")
flagSet.StringVar(&cfg.CriConfig.NetworkPluginConfDir, "cni-conf-dir", "/etc/cni/net.d", "The directory for putting cni plugin configuration files.")
flagSet.StringVar(&cfg.CriConfig.SandboxImage, "sandbox-image", "registry.cn-hangzhou.aliyuncs.com/google-containers/pause-amd64:3.0", "The image used by sandbox container.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/net/doc.go → pkg/netutils/doc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package net
package netutils

// NOTE: The code in this package is directly copy from package "k8s.io/apimachinery/pkg/util/net".
// We do this because we don't want to vendor too many irrelevant packages and try to make
Expand Down
2 changes: 1 addition & 1 deletion pkg/net/interface.go → pkg/netutils/interface.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package net
package netutils

import (
"bufio"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package net
package netutils

import (
"fmt"
Expand Down
83 changes: 83 additions & 0 deletions pkg/netutils/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package netutils

import (
"crypto/tls"
"fmt"
"net"
"os"
"strings"
"syscall"

"github.com/alibaba/pouch/pkg/user"

"github.com/sirupsen/logrus"
)

// GetListener get a listener for an address.
func GetListener(addr string, tlsConfig *tls.Config) (net.Listener, error) {
addrParts := strings.SplitN(addr, "://", 2)
if len(addrParts) != 2 {
return nil, fmt.Errorf("invalid listening address %s: must be in format [protocol]://[address]", addr)
}

switch addrParts[0] {
case "tcp":
l, err := net.Listen("tcp", addrParts[1])
if err != nil {
return l, err
}
if tlsConfig != nil {
l = tls.NewListener(l, tlsConfig)
}
return l, err
case "unix":
return newUnixSocket(addrParts[1])

default:
return nil, fmt.Errorf("only unix socket or tcp address is support")
}
}

func newUnixSocket(path string) (net.Listener, error) {
if err := syscall.Unlink(path); err != nil && !os.IsNotExist(err) {
return nil, err
}
oldmask := syscall.Umask(0777)
defer syscall.Umask(oldmask)
l, err := net.Listen("unix", path)
if err != nil {
return nil, err
}

// chmod unix socket, make other group writable
if err := os.Chmod(path, 0660); err != nil {
l.Close()
return nil, fmt.Errorf("failed to chmod %s: %s", path, err)
}

gid, err := user.ParseID(user.GroupFile, "pouch", func(line, str string, idInt int, idErr error) (uint32, bool) {
var (
name, placeholder string
id int
)

user.ParseString(line, &name, &placeholder, &id)
if str == name {
return uint32(id), true
}
return 0, false
})
if err != nil {
// ignore error when group pouch not exist, group pouch should to be
// created before pouchd started, it means code not create pouch group
logrus.Warnf("failed to find group pouch, cannot change unix socket %s to pouch group", path)
return l, nil
}

// chown unix socket with group pouch
if err := os.Chown(path, 0, int(gid)); err != nil {
l.Close()
return nil, fmt.Errorf("failed to chown %s: %s", path, err)
}
return l, nil
}
51 changes: 51 additions & 0 deletions pkg/netutils/listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package netutils

import (
"fmt"
"reflect"
"testing"
)

func TestGetListenerBasic(t *testing.T) {
type args struct {
addr string
}
tests := []struct {
name string
args args
wantErr error
}{
{
"tcpAddressTest",
args{"tcp://127.0.0.1:12345"},
nil,
},
{
"unixAddressTest",
args{"unix:///tmp/pouchtest.sock"},
nil,
},
{
"otherProtocolTest",
args{"udp://127.0.0.1:12345"},
fmt.Errorf("only unix socket or tcp address is support"),
},
{
"invalidAddressTest",
args{"invalid address"},
fmt.Errorf("invalid listening address invalid address: must be in format [protocol]://[address]"),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l, err := GetListener(tt.args.addr, nil)
if !reflect.DeepEqual(err, tt.wantErr) {
t.Errorf("GetListener() return error %v, want %v", err, tt.wantErr)
}
if err == nil {
l.Close()
}
})
}
}
6 changes: 3 additions & 3 deletions test/z_cli_daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,14 +389,14 @@ func (suite *PouchDaemonSuite) TestDaemonDefaultRegistry(c *check.C) {
defer dcfg.KillDaemon()
}

// TestDaemonCriEnbaled tests enabling cri part in pouchd.
func (suite *PouchDaemonSuite) TestDaemonCriEnbaled(c *check.C) {
// TestDaemonCriEnabled tests enabling cri part in pouchd.
func (suite *PouchDaemonSuite) TestDaemonCriEnabled(c *check.C) {
dcfg, err := StartDefaultDaemonDebug(
"--enable-cri")
c.Assert(err, check.IsNil)

result := RunWithSpecifiedDaemon(dcfg, "info")
err = util.PartialEqual(result.Combined(), "CriEnabled: true")
err = util.PartialEqual(result.Combined(), "CriEnabled: true")
c.Assert(err, check.IsNil)

defer dcfg.KillDaemon()
Expand Down

0 comments on commit 0cb3263

Please sign in to comment.