diff --git a/apis/server/router.go b/apis/server/router.go index a7ddc03bf..3711db1b6 100644 --- a/apis/server/router.go +++ b/apis/server/router.go @@ -79,7 +79,6 @@ func initRoute(s *Server) http.Handler { s.addRoute(r, http.MethodDelete, "/volumes/{name:.*}", s.removeVolume) // network - s.addRoute(r, http.MethodGet, "/networks", s.listNetwork) s.addRoute(r, http.MethodPost, "/networks/create", s.createNetwork) s.addRoute(r, http.MethodGet, "/networks/{id:.*}", s.getNetwork) @@ -91,6 +90,23 @@ func initRoute(s *Server) http.Handler { r.Path(versionMatcher + "/metrics").Methods(http.MethodGet).Handler(prometheus.Handler()) r.Path("/metrics").Methods(http.MethodGet).Handler(prometheus.Handler()) + // CRI stream server related handlers + if s.StreamRouter != nil { + endpoints := []struct { + path string + handler http.HandlerFunc + }{ + {"/exec/{token}", s.StreamRouter.ServeExec}, + {"/attach/{token}", s.StreamRouter.ServeAttach}, + {"/portforward/{token}", s.StreamRouter.ServePortForward}, + } + for _, e := range endpoints { + for _, method := range []string{http.MethodGet, http.MethodPost} { + r.Path(e.path).Methods(method).Handler(e.handler) + } + } + } + if s.Config.Debug || s.Config.EnableProfiler { profilerSetup(r) } diff --git a/apis/server/server.go b/apis/server/server.go index 132d3a2b0..18546672d 100644 --- a/apis/server/server.go +++ b/apis/server/server.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/alibaba/pouch/apis/plugins" + "github.com/alibaba/pouch/cri/stream" "github.com/alibaba/pouch/daemon/config" "github.com/alibaba/pouch/daemon/mgr" "github.com/alibaba/pouch/pkg/httputils" @@ -24,6 +25,7 @@ type Server struct { ImageMgr mgr.ImageMgr VolumeMgr mgr.VolumeMgr NetworkMgr mgr.NetworkMgr + StreamRouter stream.Router listeners []net.Listener ContainerPlugin plugins.ContainerPlugin ManagerWhiteList map[string]struct{} diff --git a/cri/config/config.go b/cri/config/config.go index b666f71b2..12db08957 100644 --- a/cri/config/config.go +++ b/cri/config/config.go @@ -17,4 +17,8 @@ type Config struct { SandboxImage string `json:"sandbox-image,omitempty"` // CriVersion is the cri version CriVersion string `json:"cri-version,omitempty"` + // StreamServerPort is the port which cri stream server is listening on. + StreamServerPort string `json:"stream-server-port,omitempty"` + // StreamServerReusePort specify whether cri stream server share port with pouchd. + StreamServerReusePort bool } diff --git a/cri/criservice.go b/cri/criservice.go index 28c41cd60..08d6ca542 100644 --- a/cri/criservice.go +++ b/cri/criservice.go @@ -3,6 +3,7 @@ package cri import ( "fmt" + "github.com/alibaba/pouch/cri/stream" criv1alpha1 "github.com/alibaba/pouch/cri/v1alpha1" servicev1alpha1 "github.com/alibaba/pouch/cri/v1alpha1/service" criv1alpha2 "github.com/alibaba/pouch/cri/v1alpha2" @@ -14,7 +15,7 @@ import ( ) // RunCriService start cri service if pouchd is specified with --enable-cri. -func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr, stopCh chan error, readyCh chan bool) { +func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr, streamRouterCh chan stream.Router, stopCh chan error, readyCh chan bool) { var err error defer func() { @@ -22,16 +23,18 @@ func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, i close(stopCh) }() if !daemonconfig.IsCriEnabled { - // the CriService has been disabled, so send Ready + // the CriService has been disabled, so send Ready and empty Stream Router + streamRouterCh <- nil readyCh <- true return } switch daemonconfig.CriConfig.CriVersion { case "v1alpha1": - err = runv1alpha1(daemonconfig, containerMgr, imageMgr, readyCh) + err = runv1alpha1(daemonconfig, containerMgr, imageMgr, streamRouterCh, readyCh) case "v1alpha2": - err = runv1alpha2(daemonconfig, containerMgr, imageMgr, volumeMgr, readyCh) + err = runv1alpha2(daemonconfig, containerMgr, imageMgr, volumeMgr, streamRouterCh, readyCh) default: + streamRouterCh <- nil readyCh <- false err = fmt.Errorf("failed to start CRI service: invalid CRI version %s, expected to be v1alpha1 or v1alpha2", daemonconfig.CriConfig.CriVersion) } @@ -39,31 +42,41 @@ func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, i } // Start CRI service with CRI version: v1alpha1 -func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, readyCh chan bool) error { +func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, streamRouterCh chan stream.Router, readyCh chan bool) error { logrus.Infof("Start CRI service with CRI version: v1alpha1") criMgr, err := criv1alpha1.NewCriManager(daemonconfig, containerMgr, imageMgr) if err != nil { + streamRouterCh <- nil readyCh <- false return fmt.Errorf("failed to get CriManager with error: %v", err) } service, err := servicev1alpha1.NewService(daemonconfig, criMgr) if err != nil { + streamRouterCh <- nil readyCh <- false return fmt.Errorf("failed to start CRI service with error: %v", err) } errChan := make(chan error, 2) + // If the cri stream server share the port with pouchd, + // export the its router. Otherwise launch it. + if daemonconfig.CriConfig.StreamServerReusePort { + errChan = make(chan error, 1) + streamRouterCh <- criMgr.StreamRouter() + } else { + go func() { + errChan <- criMgr.StreamServerStart() + logrus.Infof("CRI Stream server stopped") + }() + streamRouterCh <- nil + } + go func() { errChan <- service.Serve() logrus.Infof("CRI GRPC server stopped") }() - go func() { - errChan <- criMgr.StreamServerStart() - logrus.Infof("CRI Stream server stopped") - }() - // the criservice has set up, send Ready readyCh <- true @@ -79,31 +92,41 @@ func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, ima } // Start CRI service with CRI version: v1alpha2 -func runv1alpha2(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr, readyCh chan bool) error { +func runv1alpha2(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr, streamRouterCh chan stream.Router, readyCh chan bool) error { logrus.Infof("Start CRI service with CRI version: v1alpha2") criMgr, err := criv1alpha2.NewCriManager(daemonconfig, containerMgr, imageMgr, volumeMgr) if err != nil { + streamRouterCh <- nil readyCh <- false return fmt.Errorf("failed to get CriManager with error: %v", err) } service, err := servicev1alpha2.NewService(daemonconfig, criMgr) if err != nil { + streamRouterCh <- nil readyCh <- false return fmt.Errorf("failed to start CRI service with error: %v", err) } errChan := make(chan error, 2) + // If the cri stream server share the port with pouchd, + // export the its router. Otherwise launch it. + if daemonconfig.CriConfig.StreamServerReusePort { + errChan = make(chan error, 1) + streamRouterCh <- criMgr.StreamRouter() + } else { + go func() { + errChan <- criMgr.StreamServerStart() + logrus.Infof("CRI Stream server stopped") + }() + streamRouterCh <- nil + } + go func() { errChan <- service.Serve() logrus.Infof("CRI GRPC server stopped") }() - go func() { - errChan <- criMgr.StreamServerStart() - logrus.Infof("CRI Stream server stopped") - }() - // the criservice has set up, send Ready readyCh <- true diff --git a/cri/stream/router.go b/cri/stream/router.go new file mode 100644 index 000000000..e0ae25518 --- /dev/null +++ b/cri/stream/router.go @@ -0,0 +1,15 @@ +package stream + +import ( + "net/http" +) + +// Router exports a set of CRI Stream Server's handlers. +// We could reuse the pouchd's http server to handle +// the Stream Server's requests, so pouchd only has to +// export one port. +type Router interface { + ServeExec(w http.ResponseWriter, r *http.Request) + ServeAttach(w http.ResponseWriter, r *http.Request) + ServePortForward(w http.ResponseWriter, r *http.Request) +} diff --git a/cri/v1alpha1/cri.go b/cri/v1alpha1/cri.go index 91c6d60b8..3c75c18b7 100644 --- a/cri/v1alpha1/cri.go +++ b/cri/v1alpha1/cri.go @@ -16,6 +16,7 @@ import ( apitypes "github.com/alibaba/pouch/apis/types" anno "github.com/alibaba/pouch/cri/annotations" cni "github.com/alibaba/pouch/cri/ocicni" + "github.com/alibaba/pouch/cri/stream" "github.com/alibaba/pouch/daemon/config" "github.com/alibaba/pouch/daemon/mgr" "github.com/alibaba/pouch/pkg/errtypes" @@ -54,11 +55,6 @@ const ( // nameDelimiter is used to construct pouch container names. nameDelimiter = "_" - // Address and port of stream server. - // TODO: specify them in the parameters of pouchd. - streamServerAddress = "" - streamServerPort = "10010" - namespaceModeHost = "host" namespaceModeNone = "none" @@ -93,6 +89,9 @@ type CriMgr interface { // StreamServerStart starts the stream server of CRI. StreamServerStart() error + + // StreamRouter returns the router of Stream Server. + StreamRouter() stream.Router } // CriManager is an implementation of interface CriMgr. @@ -121,7 +120,19 @@ type CriManager struct { // NewCriManager creates a brand new cri manager. func NewCriManager(config *config.Config, ctrMgr mgr.ContainerMgr, imgMgr mgr.ImageMgr) (CriMgr, error) { - streamServer, err := newStreamServer(ctrMgr, streamServerAddress, streamServerPort) + var streamServerAddress string + streamServerPort := config.CriConfig.StreamServerPort + // If stream server reuse the pouchd's port, extract the port from pouchd's listening addresses. + if config.CriConfig.StreamServerReusePort { + streamServerAddress, streamServerPort = extractIPAndPortFromAddresses(config.Listen) + if streamServerPort == "" { + return nil, fmt.Errorf("failed to extract stream server's ip and port from pouchd's listening addresses") + } + } + + // If the reused pouchd's port is https, the url that stream server return should be with https scheme. + reuseHTTPSPort := config.CriConfig.StreamServerReusePort && config.TLS.Key != "" && config.TLS.Cert != "" + streamServer, err := newStreamServer(ctrMgr, streamServerAddress, streamServerPort, reuseHTTPSPort) if err != nil { return nil, fmt.Errorf("failed to create stream server for cri manager: %v", err) } @@ -173,6 +184,11 @@ func (c *CriManager) StreamServerStart() error { return c.StreamServer.Start() } +// StreamRouter returns the router of Stream Server. +func (c *CriManager) StreamRouter() stream.Router { + return c.StreamServer +} + // TODO: Move the underlying functions to their respective files in the future. // Version returns the runtime name, runtime version and runtime API version. diff --git a/cri/v1alpha1/cri_stream.go b/cri/v1alpha1/cri_stream.go index 9799f858a..a27b37814 100644 --- a/cri/v1alpha1/cri_stream.go +++ b/cri/v1alpha1/cri_stream.go @@ -3,14 +3,20 @@ package v1alpha1 import ( "fmt" "net" + "net/url" + "strings" "github.com/alibaba/pouch/cri/stream" "github.com/alibaba/pouch/daemon/mgr" "github.com/alibaba/pouch/pkg/netutils" + + "github.com/sirupsen/logrus" ) -func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) { - if address == "" { +func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string, reuseHTTPSPort bool) (Server, error) { + ip := net.ParseIP(address) + // If the address is "" or "0.0.0.0", choose a proper one by ourselves. + if ip == nil || ip.IsUnspecified() { a, err := netutils.ChooseBindAddress(nil) if err != nil { return nil, fmt.Errorf("failed to get stream server address: %v", err) @@ -19,6 +25,41 @@ func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Serv } config := stream.DefaultConfig config.Address = net.JoinHostPort(address, port) + config.BaseURL = &url.URL{ + Scheme: "http", + Host: config.Address, + } + if reuseHTTPSPort { + config.BaseURL.Scheme = "https" + } + logrus.Infof("Stream Server will bind to address %v", config.Address) + runtime := stream.NewStreamRuntime(ctrMgr) return NewServer(config, runtime) } + +// extractIPAndPortFromAddresses extract first valid ip and port from addresses. +func extractIPAndPortFromAddresses(addresses []string) (string, string) { + for _, addr := range addresses { + addrParts := strings.SplitN(addr, "://", 2) + if len(addrParts) != 2 { + logrus.Errorf("invalid listening address %s: must be in format [protocol]://[address]", addr) + continue + } + + switch addrParts[0] { + case "tcp": + host, port, err := net.SplitHostPort(addrParts[1]) + if err != nil { + logrus.Errorf("failed to split host and port from address: %v", err) + continue + } + return host, port + case "unix": + continue + default: + logrus.Errorf("only unix socket or tcp address is support") + } + } + return "", "" +} diff --git a/cri/v1alpha1/cri_stream_test.go b/cri/v1alpha1/cri_stream_test.go new file mode 100644 index 000000000..fe46b8570 --- /dev/null +++ b/cri/v1alpha1/cri_stream_test.go @@ -0,0 +1,50 @@ +package v1alpha1 + +import ( + "testing" +) + +func Test_extractIPAndPortFromAddresses(t *testing.T) { + tests := []struct { + name string + args []string + wantIP string + wantPort string + }{ + { + name: "listening addresses are nil", + args: nil, + wantIP: "", + wantPort: "", + }, + { + name: "listening addresses have no tcp address", + args: []string{"unix:///var/run/pouchd.sock"}, + wantIP: "", + wantPort: "", + }, + { + name: "listening addresses have valid address", + args: []string{"unix:///var/run/pouchd.sock", "tcp://0.0.0.0:4345"}, + wantIP: "0.0.0.0", + wantPort: "4345", + }, + { + name: "listening addresses have two tcp addresses", + args: []string{"tcp://10.10.10.10:1234", "tcp://0.0.0.0:4345"}, + wantIP: "10.10.10.10", + wantPort: "1234", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotIP, gotPort := extractIPAndPortFromAddresses(tt.args) + if gotIP != tt.wantIP { + t.Errorf("extractIPAndPortFromAddresses() IP = %v, want IP %v", gotIP, tt.wantIP) + } + if gotPort != tt.wantPort { + t.Errorf("extractIPAndPortFromAddresses() Port = %v, want Port %v", gotPort, tt.wantPort) + } + }) + } +} diff --git a/cri/v1alpha1/cri_wrapper.go b/cri/v1alpha1/cri_wrapper.go index a111ea93d..954012d66 100644 --- a/cri/v1alpha1/cri_wrapper.go +++ b/cri/v1alpha1/cri_wrapper.go @@ -1,6 +1,8 @@ package v1alpha1 import ( + "github.com/alibaba/pouch/cri/stream" + "github.com/sirupsen/logrus" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -29,6 +31,11 @@ func (c *CriWrapper) StreamServerStart() (err error) { return c.CriManager.StreamServerStart() } +// StreamRouter returns the router of Stream Server. +func (c *CriWrapper) StreamRouter() stream.Router { + return c.CriManager.StreamRouter() +} + // Version returns the runtime name, runtime version and runtime API version. func (c *CriWrapper) Version(ctx context.Context, r *runtime.VersionRequest) (res *runtime.VersionResponse, err error) { logrus.Debugf("Version shows the basic information of cri Manager") diff --git a/cri/v1alpha1/server.go b/cri/v1alpha1/server.go index 2b5b2d1f2..ea6703ad7 100644 --- a/cri/v1alpha1/server.go +++ b/cri/v1alpha1/server.go @@ -28,6 +28,9 @@ type Server interface { // Start starts the stream server. Start() error + + // Router is the Stream Server's handlers which we should export. + stream.Router } type server struct { @@ -45,20 +48,13 @@ func NewServer(config stream.Config, runtime stream.Runtime) (Server, error) { cache: stream.NewRequestCache(), } - if s.config.BaseURL == nil { - s.config.BaseURL = &url.URL{ - Scheme: "http", - Host: s.config.Address, - } - } - endpoints := []struct { path string handler http.HandlerFunc }{ - {"/exec/{token}", s.serveExec}, - {"/attach/{token}", s.serveAttach}, - {"/portforward/{token}", s.servePortForward}, + {"/exec/{token}", s.ServeExec}, + {"/attach/{token}", s.ServeAttach}, + {"/portforward/{token}", s.ServePortForward}, } r := mux.NewRouter() @@ -81,7 +77,7 @@ func (s *server) Start() error { return s.server.ListenAndServe() } -func (s *server) serveExec(w http.ResponseWriter, r *http.Request) { +func (s *server) ServeExec(w http.ResponseWriter, r *http.Request) { ctx := r.Context() token := mux.Vars(r)["token"] @@ -117,7 +113,7 @@ func (s *server) serveExec(w http.ResponseWriter, r *http.Request) { ) } -func (s *server) serveAttach(w http.ResponseWriter, r *http.Request) { +func (s *server) ServeAttach(w http.ResponseWriter, r *http.Request) { ctx := r.Context() token := mux.Vars(r)["token"] @@ -151,7 +147,7 @@ func (s *server) serveAttach(w http.ResponseWriter, r *http.Request) { ) } -func (s *server) servePortForward(w http.ResponseWriter, r *http.Request) { +func (s *server) ServePortForward(w http.ResponseWriter, r *http.Request) { ctx := r.Context() token := mux.Vars(r)["token"] diff --git a/cri/v1alpha2/cri.go b/cri/v1alpha2/cri.go index 92064c0eb..e1dadbeb0 100644 --- a/cri/v1alpha2/cri.go +++ b/cri/v1alpha2/cri.go @@ -17,6 +17,7 @@ import ( anno "github.com/alibaba/pouch/cri/annotations" runtime "github.com/alibaba/pouch/cri/apis/v1alpha2" cni "github.com/alibaba/pouch/cri/ocicni" + "github.com/alibaba/pouch/cri/stream" "github.com/alibaba/pouch/daemon/config" "github.com/alibaba/pouch/daemon/mgr" "github.com/alibaba/pouch/pkg/errtypes" @@ -54,11 +55,6 @@ const ( // nameDelimiter is used to construct pouch container names. nameDelimiter = "_" - // Address and port of stream server. - // TODO: specify them in the parameters of pouchd. - streamServerAddress = "" - streamServerPort = "10011" - namespaceModeHost = "host" namespaceModeNone = "none" @@ -96,6 +92,9 @@ type CriMgr interface { // StreamServerStart starts the stream server of CRI. StreamServerStart() error + + // StreamStart returns the router of Stream Server. + StreamRouter() stream.Router } // CriManager is an implementation of interface CriMgr. @@ -126,7 +125,19 @@ type CriManager struct { // NewCriManager creates a brand new cri manager. func NewCriManager(config *config.Config, ctrMgr mgr.ContainerMgr, imgMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr) (CriMgr, error) { - streamServer, err := newStreamServer(ctrMgr, streamServerAddress, streamServerPort) + var streamServerAddress string + streamServerPort := config.CriConfig.StreamServerPort + // If stream server reuse the pouchd's port, extract the ip and port from pouchd's listening addresses. + if config.CriConfig.StreamServerReusePort { + streamServerAddress, streamServerPort = extractIPAndPortFromAddresses(config.Listen) + if streamServerPort == "" { + return nil, fmt.Errorf("failed to extract stream server's port from pouchd's listening addresses") + } + } + + // If the reused pouchd's port is https, the url that stream server return should be with https scheme. + reuseHTTPSPort := config.CriConfig.StreamServerReusePort && config.TLS.Key != "" && config.TLS.Cert != "" + streamServer, err := newStreamServer(ctrMgr, streamServerAddress, streamServerPort, reuseHTTPSPort) if err != nil { return nil, fmt.Errorf("failed to create stream server for cri manager: %v", err) } @@ -176,6 +187,11 @@ func (c *CriManager) StreamServerStart() error { return c.StreamServer.Start() } +// StreamRouter returns the router of Stream Server. +func (c *CriManager) StreamRouter() stream.Router { + return c.StreamServer +} + // TODO: Move the underlying functions to their respective files in the future. // Version returns the runtime name, runtime version and runtime API version. diff --git a/cri/v1alpha2/cri_stream.go b/cri/v1alpha2/cri_stream.go index cb1064fba..c7508a632 100644 --- a/cri/v1alpha2/cri_stream.go +++ b/cri/v1alpha2/cri_stream.go @@ -3,14 +3,20 @@ package v1alpha2 import ( "fmt" "net" + "net/url" + "strings" "github.com/alibaba/pouch/cri/stream" "github.com/alibaba/pouch/daemon/mgr" "github.com/alibaba/pouch/pkg/netutils" + + "github.com/sirupsen/logrus" ) -func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) { - if address == "" { +func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string, reuseHTTPSPort bool) (Server, error) { + ip := net.ParseIP(address) + // If the address is "" or "0.0.0.0", choose a proper one by ourselves. + if ip == nil || ip.IsUnspecified() { a, err := netutils.ChooseBindAddress(nil) if err != nil { return nil, fmt.Errorf("failed to get stream server address: %v", err) @@ -19,6 +25,41 @@ func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Serv } config := stream.DefaultConfig config.Address = net.JoinHostPort(address, port) + config.BaseURL = &url.URL{ + Scheme: "http", + Host: config.Address, + } + if reuseHTTPSPort { + config.BaseURL.Scheme = "https" + } + logrus.Infof("Stream Server will bind to address %v", config.Address) + runtime := stream.NewStreamRuntime(ctrMgr) return NewServer(config, runtime) } + +// extractIPAndPortFromAddresses extract first valid ip and port from addresses. +func extractIPAndPortFromAddresses(addresses []string) (string, string) { + for _, addr := range addresses { + addrParts := strings.SplitN(addr, "://", 2) + if len(addrParts) != 2 { + logrus.Errorf("invalid listening address %s: must be in format [protocol]://[address]", addr) + continue + } + + switch addrParts[0] { + case "tcp": + host, port, err := net.SplitHostPort(addrParts[1]) + if err != nil { + logrus.Errorf("failed to split host and port from address: %v", err) + continue + } + return host, port + case "unix": + continue + default: + logrus.Errorf("only unix socket or tcp address is support") + } + } + return "", "" +} diff --git a/cri/v1alpha2/cri_stream_test.go b/cri/v1alpha2/cri_stream_test.go new file mode 100644 index 000000000..10edf3632 --- /dev/null +++ b/cri/v1alpha2/cri_stream_test.go @@ -0,0 +1,50 @@ +package v1alpha2 + +import ( + "testing" +) + +func Test_extractIPAndPortFromAddresses(t *testing.T) { + tests := []struct { + name string + args []string + wantIP string + wantPort string + }{ + { + name: "listening addresses are nil", + args: nil, + wantIP: "", + wantPort: "", + }, + { + name: "listening addresses have no tcp address", + args: []string{"unix:///var/run/pouchd.sock"}, + wantIP: "", + wantPort: "", + }, + { + name: "listening addresses have valid address", + args: []string{"unix:///var/run/pouchd.sock", "tcp://0.0.0.0:4345"}, + wantIP: "0.0.0.0", + wantPort: "4345", + }, + { + name: "listening addresses have two tcp addresses", + args: []string{"tcp://10.10.10.10:1234", "tcp://0.0.0.0:4345"}, + wantIP: "10.10.10.10", + wantPort: "1234", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotIP, gotPort := extractIPAndPortFromAddresses(tt.args) + if gotIP != tt.wantIP { + t.Errorf("extractIPAndPortFromAddresses() IP = %v, want IP %v", gotIP, tt.wantIP) + } + if gotPort != tt.wantPort { + t.Errorf("extractIPAndPortFromAddresses() Port = %v, want Port %v", gotPort, tt.wantPort) + } + }) + } +} diff --git a/cri/v1alpha2/cri_wrapper.go b/cri/v1alpha2/cri_wrapper.go index 7fe3ad67a..8e886fc6c 100644 --- a/cri/v1alpha2/cri_wrapper.go +++ b/cri/v1alpha2/cri_wrapper.go @@ -2,6 +2,7 @@ package v1alpha2 import ( runtime "github.com/alibaba/pouch/cri/apis/v1alpha2" + "github.com/alibaba/pouch/cri/stream" "github.com/sirupsen/logrus" "golang.org/x/net/context" @@ -20,14 +21,21 @@ func NewCriWrapper(c *CriManager) *CriWrapper { // StreamServerStart starts the stream server of CRI. func (c *CriWrapper) StreamServerStart() (err error) { logrus.Infof("StreamServerStart starts stream server of cri manager") - if err != nil { - logrus.Errorf("failed to start StreamServer: %v", err) - } else { - logrus.Infof("success to start StreamServer of cri manager") - } + defer func() { + if err != nil { + logrus.Errorf("failed to start StreamServer: %v", err) + } else { + logrus.Infof("success to start StreamServer of cri manager") + } + }() return c.CriManager.StreamServerStart() } +// StreamRouter returns the router of Stream Server. +func (c *CriWrapper) StreamRouter() stream.Router { + return c.CriManager.StreamRouter() +} + // Version returns the runtime name, runtime version and runtime API version. func (c *CriWrapper) Version(ctx context.Context, r *runtime.VersionRequest) (res *runtime.VersionResponse, err error) { logrus.Debugf("Version shows the basic information of cri Manager") diff --git a/cri/v1alpha2/server.go b/cri/v1alpha2/server.go index de8d3f065..7ee4cecd3 100644 --- a/cri/v1alpha2/server.go +++ b/cri/v1alpha2/server.go @@ -28,6 +28,9 @@ type Server interface { // Start starts the stream server. Start() error + + // Router is the Stream Server's handlers which we should export. + stream.Router } type server struct { @@ -45,20 +48,13 @@ func NewServer(config stream.Config, runtime stream.Runtime) (Server, error) { cache: stream.NewRequestCache(), } - if s.config.BaseURL == nil { - s.config.BaseURL = &url.URL{ - Scheme: "http", - Host: s.config.Address, - } - } - endpoints := []struct { path string handler http.HandlerFunc }{ - {"/exec/{token}", s.serveExec}, - {"/attach/{token}", s.serveAttach}, - {"/portforward/{token}", s.servePortForward}, + {"/exec/{token}", s.ServeExec}, + {"/attach/{token}", s.ServeAttach}, + {"/portforward/{token}", s.ServePortForward}, } r := mux.NewRouter() @@ -81,7 +77,7 @@ func (s *server) Start() error { return s.server.ListenAndServe() } -func (s *server) serveExec(w http.ResponseWriter, r *http.Request) { +func (s *server) ServeExec(w http.ResponseWriter, r *http.Request) { ctx := r.Context() token := mux.Vars(r)["token"] @@ -117,7 +113,7 @@ func (s *server) serveExec(w http.ResponseWriter, r *http.Request) { ) } -func (s *server) serveAttach(w http.ResponseWriter, r *http.Request) { +func (s *server) ServeAttach(w http.ResponseWriter, r *http.Request) { ctx := r.Context() token := mux.Vars(r)["token"] @@ -151,7 +147,7 @@ func (s *server) serveAttach(w http.ResponseWriter, r *http.Request) { ) } -func (s *server) servePortForward(w http.ResponseWriter, r *http.Request) { +func (s *server) ServePortForward(w http.ResponseWriter, r *http.Request) { ctx := r.Context() token := mux.Vars(r)["token"] diff --git a/daemon/daemon.go b/daemon/daemon.go index 66875d118..715b14499 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -10,6 +10,7 @@ import ( "github.com/alibaba/pouch/apis/plugins" "github.com/alibaba/pouch/apis/server" criservice "github.com/alibaba/pouch/cri" + "github.com/alibaba/pouch/cri/stream" "github.com/alibaba/pouch/ctrd" "github.com/alibaba/pouch/daemon/config" "github.com/alibaba/pouch/daemon/events" @@ -197,16 +198,6 @@ func (d *Daemon) Run() error { return err } - d.server = server.Server{ - Config: d.config, - ContainerMgr: containerMgr, - SystemMgr: systemMgr, - ImageMgr: imageMgr, - VolumeMgr: volumeMgr, - NetworkMgr: networkMgr, - ContainerPlugin: d.containerPlugin, - } - // init base network err = d.networkInit(ctx) if err != nil { @@ -216,20 +207,34 @@ func (d *Daemon) Run() error { // set image proxy ctrd.SetImageProxy(d.config.ImageProxy) - httpReadyCh := make(chan bool) + criStreamRouterCh := make(chan stream.Router) criReadyCh := make(chan bool) + criStopCh := make(chan error) + + go criservice.RunCriService(d.config, d.containerMgr, d.imageMgr, d.volumeMgr, criStreamRouterCh, criStopCh, criReadyCh) + + streamRouter := <-criStreamRouterCh - httpServerCloseCh := make(chan struct{}) + d.server = server.Server{ + Config: d.config, + ContainerMgr: containerMgr, + SystemMgr: systemMgr, + ImageMgr: imageMgr, + VolumeMgr: volumeMgr, + NetworkMgr: networkMgr, + StreamRouter: streamRouter, + ContainerPlugin: d.containerPlugin, + } + + httpReadyCh := make(chan bool) + httpCloseCh := make(chan struct{}) go func() { if err := d.server.Start(httpReadyCh); err != nil { logrus.Errorf("failed to start http server: %v", err) } - close(httpServerCloseCh) + close(httpCloseCh) }() - criStopCh := make(chan error) - go criservice.RunCriService(d.config, d.containerMgr, d.imageMgr, d.volumeMgr, criStopCh, criReadyCh) - httpReady := <-httpReadyCh criReady := <-criReadyCh @@ -247,7 +252,7 @@ func (d *Daemon) Run() error { } // Stop pouchd if the server stopped - <-httpServerCloseCh + <-httpCloseCh logrus.Infof("HTTP server stopped") return nil diff --git a/main.go b/main.go index 1e4c0b6ac..6ffa74114 100644 --- a/main.go +++ b/main.go @@ -76,6 +76,8 @@ func setupFlags(cmd *cobra.Command) { flagSet.StringVar(&cfg.CriConfig.NetworkPluginBinDir, "cni-bin-dir", "/opt/cni/bin", "The directory for putting cni plugin binaries.") flagSet.StringVar(&cfg.CriConfig.NetworkPluginConfDir, "cni-conf-dir", "/etc/cni/net.d", "The directory for putting cni plugin configuration files.") flagSet.StringVar(&cfg.CriConfig.SandboxImage, "sandbox-image", "registry.cn-hangzhou.aliyuncs.com/google-containers/pause-amd64:3.0", "The image used by sandbox container.") + flagSet.StringVar(&cfg.CriConfig.StreamServerPort, "stream-server-port", "10010", "The port stream server of cri is listening on.") + flagSet.BoolVar(&cfg.CriConfig.StreamServerReusePort, "stream-server-reuse-port", false, "Specify whether cri stream server share port with pouchd. If this is true, the listen option of pouchd should specify a tcp socket and its port should be same with stream-server-port.") flagSet.BoolVarP(&cfg.Debug, "debug", "D", false, "Switch daemon log level to DEBUG mode") flagSet.StringVarP(&cfg.ContainerdAddr, "containerd", "c", "/var/run/containerd.sock", "Specify listening address of containerd") flagSet.StringVar(&cfg.ContainerdPath, "containerd-path", "", "Specify the path of containerd binary")