Skip to content

Commit

Permalink
refactor: make CRI Stream Server share port(http server) with pouchd
Browse files Browse the repository at this point in the history
Signed-off-by: YaoZengzeng <[email protected]>
  • Loading branch information
YaoZengzeng committed Sep 14, 2018
1 parent 7d8d6fc commit e5914d5
Show file tree
Hide file tree
Showing 17 changed files with 369 additions and 81 deletions.
18 changes: 17 additions & 1 deletion apis/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func initRoute(s *Server) http.Handler {
s.addRoute(r, http.MethodDelete, "/volumes/{name:.*}", s.removeVolume)

// network

s.addRoute(r, http.MethodGet, "/networks", s.listNetwork)
s.addRoute(r, http.MethodPost, "/networks/create", s.createNetwork)
s.addRoute(r, http.MethodGet, "/networks/{id:.*}", s.getNetwork)
Expand All @@ -91,6 +90,23 @@ func initRoute(s *Server) http.Handler {
r.Path(versionMatcher + "/metrics").Methods(http.MethodGet).Handler(prometheus.Handler())
r.Path("/metrics").Methods(http.MethodGet).Handler(prometheus.Handler())

// CRI stream server related handlers
if s.StreamRouter != nil {
endpoints := []struct {
path string
handler http.HandlerFunc
}{
{"/exec/{token}", s.StreamRouter.ServeExec},
{"/attach/{token}", s.StreamRouter.ServeAttach},
{"/portforward/{token}", s.StreamRouter.ServePortForward},
}
for _, e := range endpoints {
for _, method := range []string{http.MethodGet, http.MethodPost} {
r.Path(e.path).Methods(method).Handler(e.handler)
}
}
}

if s.Config.Debug || s.Config.EnableProfiler {
profilerSetup(r)
}
Expand Down
2 changes: 2 additions & 0 deletions apis/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/alibaba/pouch/apis/plugins"
"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/httputils"
Expand All @@ -24,6 +25,7 @@ type Server struct {
ImageMgr mgr.ImageMgr
VolumeMgr mgr.VolumeMgr
NetworkMgr mgr.NetworkMgr
StreamRouter stream.Router
listeners []net.Listener
ContainerPlugin plugins.ContainerPlugin
ManagerWhiteList map[string]struct{}
Expand Down
4 changes: 4 additions & 0 deletions cri/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ type Config struct {
SandboxImage string `json:"sandbox-image,omitempty"`
// CriVersion is the cri version
CriVersion string `json:"cri-version,omitempty"`
// StreamServerPort is the port which cri stream server is listening on.
StreamServerPort string `json:"stream-server-port,omitempty"`
// StreamServerReusePort specify whether cri stream server share port with pouchd.
StreamServerReusePort bool
}
55 changes: 39 additions & 16 deletions cri/criservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cri
import (
"fmt"

"github.com/alibaba/pouch/cri/stream"
criv1alpha1 "github.com/alibaba/pouch/cri/v1alpha1"
servicev1alpha1 "github.com/alibaba/pouch/cri/v1alpha1/service"
criv1alpha2 "github.com/alibaba/pouch/cri/v1alpha2"
Expand All @@ -14,56 +15,68 @@ import (
)

// RunCriService start cri service if pouchd is specified with --enable-cri.
func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr, stopCh chan error, readyCh chan bool) {
func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr, streamRouterCh chan stream.Router, stopCh chan error, readyCh chan bool) {
var err error

defer func() {
stopCh <- err
close(stopCh)
}()
if !daemonconfig.IsCriEnabled {
// the CriService has been disabled, so send Ready
// the CriService has been disabled, so send Ready and empty Stream Router
streamRouterCh <- nil
readyCh <- true
return
}
switch daemonconfig.CriConfig.CriVersion {
case "v1alpha1":
err = runv1alpha1(daemonconfig, containerMgr, imageMgr, readyCh)
err = runv1alpha1(daemonconfig, containerMgr, imageMgr, streamRouterCh, readyCh)
case "v1alpha2":
err = runv1alpha2(daemonconfig, containerMgr, imageMgr, volumeMgr, readyCh)
err = runv1alpha2(daemonconfig, containerMgr, imageMgr, volumeMgr, streamRouterCh, readyCh)
default:
streamRouterCh <- nil
readyCh <- false
err = fmt.Errorf("failed to start CRI service: invalid CRI version %s, expected to be v1alpha1 or v1alpha2", daemonconfig.CriConfig.CriVersion)
}
return
}

// Start CRI service with CRI version: v1alpha1
func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, readyCh chan bool) error {
func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, streamRouterCh chan stream.Router, readyCh chan bool) error {
logrus.Infof("Start CRI service with CRI version: v1alpha1")
criMgr, err := criv1alpha1.NewCriManager(daemonconfig, containerMgr, imageMgr)
if err != nil {
streamRouterCh <- nil
readyCh <- false
return fmt.Errorf("failed to get CriManager with error: %v", err)
}

service, err := servicev1alpha1.NewService(daemonconfig, criMgr)
if err != nil {
streamRouterCh <- nil
readyCh <- false
return fmt.Errorf("failed to start CRI service with error: %v", err)
}

errChan := make(chan error, 2)
// If the cri stream server share the port with pouchd,
// export the its router. Otherwise launch it.
if daemonconfig.CriConfig.StreamServerReusePort {
errChan = make(chan error, 1)
streamRouterCh <- criMgr.StreamRouter()
} else {
go func() {
errChan <- criMgr.StreamServerStart()
logrus.Infof("CRI Stream server stopped")
}()
streamRouterCh <- nil
}

go func() {
errChan <- service.Serve()
logrus.Infof("CRI GRPC server stopped")
}()

go func() {
errChan <- criMgr.StreamServerStart()
logrus.Infof("CRI Stream server stopped")
}()

// the criservice has set up, send Ready
readyCh <- true

Expand All @@ -79,31 +92,41 @@ func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, ima
}

// Start CRI service with CRI version: v1alpha2
func runv1alpha2(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr, readyCh chan bool) error {
func runv1alpha2(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, volumeMgr mgr.VolumeMgr, streamRouterCh chan stream.Router, readyCh chan bool) error {
logrus.Infof("Start CRI service with CRI version: v1alpha2")
criMgr, err := criv1alpha2.NewCriManager(daemonconfig, containerMgr, imageMgr, volumeMgr)
if err != nil {
streamRouterCh <- nil
readyCh <- false
return fmt.Errorf("failed to get CriManager with error: %v", err)
}

service, err := servicev1alpha2.NewService(daemonconfig, criMgr)
if err != nil {
streamRouterCh <- nil
readyCh <- false
return fmt.Errorf("failed to start CRI service with error: %v", err)
}

errChan := make(chan error, 2)
// If the cri stream server share the port with pouchd,
// export the its router. Otherwise launch it.
if daemonconfig.CriConfig.StreamServerReusePort {
errChan = make(chan error, 1)
streamRouterCh <- criMgr.StreamRouter()
} else {
go func() {
errChan <- criMgr.StreamServerStart()
logrus.Infof("CRI Stream server stopped")
}()
streamRouterCh <- nil
}

go func() {
errChan <- service.Serve()
logrus.Infof("CRI GRPC server stopped")
}()

go func() {
errChan <- criMgr.StreamServerStart()
logrus.Infof("CRI Stream server stopped")
}()

// the criservice has set up, send Ready
readyCh <- true

Expand Down
15 changes: 15 additions & 0 deletions cri/stream/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package stream

import (
"net/http"
)

// Router exports a set of CRI Stream Server's handlers.
// We could reuse the pouchd's http server to handle
// the Stream Server's requests, so pouchd only has to
// export one port.
type Router interface {
ServeExec(w http.ResponseWriter, r *http.Request)
ServeAttach(w http.ResponseWriter, r *http.Request)
ServePortForward(w http.ResponseWriter, r *http.Request)
}
28 changes: 22 additions & 6 deletions cri/v1alpha1/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
apitypes "github.com/alibaba/pouch/apis/types"
anno "github.com/alibaba/pouch/cri/annotations"
cni "github.com/alibaba/pouch/cri/ocicni"
"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/errtypes"
Expand Down Expand Up @@ -54,11 +55,6 @@ const (
// nameDelimiter is used to construct pouch container names.
nameDelimiter = "_"

// Address and port of stream server.
// TODO: specify them in the parameters of pouchd.
streamServerAddress = ""
streamServerPort = "10010"

namespaceModeHost = "host"
namespaceModeNone = "none"

Expand Down Expand Up @@ -93,6 +89,9 @@ type CriMgr interface {

// StreamServerStart starts the stream server of CRI.
StreamServerStart() error

// StreamRouter returns the router of Stream Server.
StreamRouter() stream.Router
}

// CriManager is an implementation of interface CriMgr.
Expand Down Expand Up @@ -121,7 +120,19 @@ type CriManager struct {

// NewCriManager creates a brand new cri manager.
func NewCriManager(config *config.Config, ctrMgr mgr.ContainerMgr, imgMgr mgr.ImageMgr) (CriMgr, error) {
streamServer, err := newStreamServer(ctrMgr, streamServerAddress, streamServerPort)
var streamServerAddress string
streamServerPort := config.CriConfig.StreamServerPort
// If stream server reuse the pouchd's port, extract the port from pouchd's listening addresses.
if config.CriConfig.StreamServerReusePort {
streamServerAddress, streamServerPort = extractIPAndPortFromAddresses(config.Listen)
if streamServerPort == "" {
return nil, fmt.Errorf("failed to extract stream server's ip and port from pouchd's listening addresses")
}
}

// If the reused pouchd's port is https, the url that stream server return should be with https scheme.
reuseHTTPSPort := config.CriConfig.StreamServerReusePort && config.TLS.Key != "" && config.TLS.Cert != ""
streamServer, err := newStreamServer(ctrMgr, streamServerAddress, streamServerPort, reuseHTTPSPort)
if err != nil {
return nil, fmt.Errorf("failed to create stream server for cri manager: %v", err)
}
Expand Down Expand Up @@ -173,6 +184,11 @@ func (c *CriManager) StreamServerStart() error {
return c.StreamServer.Start()
}

// StreamRouter returns the router of Stream Server.
func (c *CriManager) StreamRouter() stream.Router {
return c.StreamServer
}

// TODO: Move the underlying functions to their respective files in the future.

// Version returns the runtime name, runtime version and runtime API version.
Expand Down
45 changes: 43 additions & 2 deletions cri/v1alpha1/cri_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@ package v1alpha1
import (
"fmt"
"net"
"net/url"
"strings"

"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/netutils"

"github.com/sirupsen/logrus"
)

func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) {
if address == "" {
func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string, reuseHTTPSPort bool) (Server, error) {
ip := net.ParseIP(address)
// If the address is "" or "0.0.0.0", choose a proper one by ourselves.
if ip == nil || ip.IsUnspecified() {
a, err := netutils.ChooseBindAddress(nil)
if err != nil {
return nil, fmt.Errorf("failed to get stream server address: %v", err)
Expand All @@ -19,6 +25,41 @@ func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Serv
}
config := stream.DefaultConfig
config.Address = net.JoinHostPort(address, port)
config.BaseURL = &url.URL{
Scheme: "http",
Host: config.Address,
}
if reuseHTTPSPort {
config.BaseURL.Scheme = "https"
}
logrus.Infof("Stream Server will bind to address %v", config.Address)

runtime := stream.NewStreamRuntime(ctrMgr)
return NewServer(config, runtime)
}

// extractIPAndPortFromAddresses extract first valid ip and port from addresses.
func extractIPAndPortFromAddresses(addresses []string) (string, string) {
for _, addr := range addresses {
addrParts := strings.SplitN(addr, "://", 2)
if len(addrParts) != 2 {
logrus.Errorf("invalid listening address %s: must be in format [protocol]://[address]", addr)
continue
}

switch addrParts[0] {
case "tcp":
host, port, err := net.SplitHostPort(addrParts[1])
if err != nil {
logrus.Errorf("failed to split host and port from address: %v", err)
continue
}
return host, port
case "unix":
continue
default:
logrus.Errorf("only unix socket or tcp address is support")
}
}
return "", ""
}
50 changes: 50 additions & 0 deletions cri/v1alpha1/cri_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package v1alpha1

import (
"testing"
)

func Test_extractIPAndPortFromAddresses(t *testing.T) {
tests := []struct {
name string
args []string
wantIP string
wantPort string
}{
{
name: "listening addresses are nil",
args: nil,
wantIP: "",
wantPort: "",
},
{
name: "listening addresses have no tcp address",
args: []string{"unix:///var/run/pouchd.sock"},
wantIP: "",
wantPort: "",
},
{
name: "listening addresses have valid address",
args: []string{"unix:///var/run/pouchd.sock", "tcp://0.0.0.0:4345"},
wantIP: "0.0.0.0",
wantPort: "4345",
},
{
name: "listening addresses have two tcp addresses",
args: []string{"tcp://10.10.10.10:1234", "tcp://0.0.0.0:4345"},
wantIP: "10.10.10.10",
wantPort: "1234",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotIP, gotPort := extractIPAndPortFromAddresses(tt.args)
if gotIP != tt.wantIP {
t.Errorf("extractIPAndPortFromAddresses() IP = %v, want IP %v", gotIP, tt.wantIP)
}
if gotPort != tt.wantPort {
t.Errorf("extractIPAndPortFromAddresses() Port = %v, want Port %v", gotPort, tt.wantPort)
}
})
}
}
Loading

0 comments on commit e5914d5

Please sign in to comment.