Skip to content

Commit

Permalink
Merge pull request #1359 from Starnop/cri-upgrade
Browse files Browse the repository at this point in the history
feature: Update CRI from v1alpha1 to v1alpha2 & support down-level compatibility
  • Loading branch information
Wei Fu authored May 23, 2018
2 parents cef9a10 + 206b027 commit 0df5157
Show file tree
Hide file tree
Showing 30 changed files with 31,794 additions and 95 deletions.
2 changes: 2 additions & 0 deletions cri/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ type Config struct {
NetworkPluginConfDir string
// SandboxImage is the image used by sandbox container.
SandboxImage string
// CriVersion is the cri version
CriVersion string
}
114 changes: 114 additions & 0 deletions cri/criservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package cri

import (
"fmt"

criv1alpha1 "github.com/alibaba/pouch/cri/v1alpha1"
servicev1alpha1 "github.com/alibaba/pouch/cri/v1alpha1/service"
criv1alpha2 "github.com/alibaba/pouch/cri/v1alpha2"
servicev1alpha2 "github.com/alibaba/pouch/cri/v1alpha2/service"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"

"github.com/sirupsen/logrus"
)

// RunCriService start cri service if pouchd is specified with --enable-cri.
func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, stopCh chan error) {
var err error

defer func() {
stopCh <- err
close(stopCh)
}()
if !daemonconfig.IsCriEnabled {
return
}
switch daemonconfig.CriConfig.CriVersion {
case "v1alpha1":
err = runv1alpha1(daemonconfig, containerMgr, imageMgr)
case "v1alpha2":
err = runv1alpha2(daemonconfig, containerMgr, imageMgr)
default:
err = fmt.Errorf("invalid CRI version,failed to start CRI service")
}
return
}

// Start CRI service with CRI version: v1alpha1
func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr) error {
logrus.Infof("Start CRI service with CRI version: v1alpha1")
criMgr, err := criv1alpha1.NewCriManager(daemonconfig, containerMgr, imageMgr)
if err != nil {
return fmt.Errorf("failed to get CriManager with error: %v", err)
}

service, err := servicev1alpha1.NewService(daemonconfig, criMgr)
if err != nil {
return fmt.Errorf("failed to start CRI service with error: %v", err)
}

// TODO: Stop the whole CRI service if any of the critical service exits
grpcServerCloseCh := make(chan struct{})
go func() {
if err := service.Serve(); err != nil {
logrus.Errorf("failed to start grpc server: %v", err)
}
close(grpcServerCloseCh)
}()

streamServerCloseCh := make(chan struct{})
go func() {
if err := criMgr.StreamServerStart(); err != nil {
logrus.Errorf("failed to start stream server: %v", err)
}
close(streamServerCloseCh)
}()

// TODO: refactor it with select
<-streamServerCloseCh
logrus.Infof("CRI Stream server stopped")
<-grpcServerCloseCh
logrus.Infof("CRI GRPC server stopped")

logrus.Infof("CRI service stopped")
return nil
}

// Start CRI service with CRI version: v1alpha2
func runv1alpha2(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr) error {
logrus.Infof("Start CRI service with CRI version: v1alpha2")
criMgr, err := criv1alpha2.NewCriManager(daemonconfig, containerMgr, imageMgr)
if err != nil {
return fmt.Errorf("failed to get CriManager with error: %v", err)
}

service, err := servicev1alpha2.NewService(daemonconfig, criMgr)
if err != nil {
return fmt.Errorf("failed to start CRI service with error: %v", err)
}
// TODO: Stop the whole CRI service if any of the critical service exits
grpcServerCloseCh := make(chan struct{})
go func() {
if err := service.Serve(); err != nil {
logrus.Errorf("failed to start grpc server: %v", err)
}
close(grpcServerCloseCh)
}()

streamServerCloseCh := make(chan struct{})
go func() {
if err := criMgr.StreamServerStart(); err != nil {
logrus.Errorf("failed to start stream server: %v", err)
}
close(streamServerCloseCh)
}()
// TODO: refactor it with select
<-streamServerCloseCh
logrus.Infof("CRI Stream server stopped")
<-grpcServerCloseCh
logrus.Infof("CRI GRPC server stopped")

logrus.Infof("CRI service stopped")
return nil
}
23 changes: 12 additions & 11 deletions cri/stream/request_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ var (
TokenLen = 8
)

// requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
// RequestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
// random token for their retrieval. The requestCache is used for building streaming URLs without
// the need to encode every request parameter in the URL.
type requestCache struct {
type RequestCache struct {
// tokens maps the generate token to the request for fast retrieval.
tokens map[string]*list.Element
// ll maintains an age-ordered request list for faster garbage collection of expired requests.
Expand All @@ -31,24 +31,25 @@ type requestCache struct {
lock sync.Mutex
}

// Type representing an *ExecRequest, *AttachRequest, or *PortForwardRequest.
type request interface{}
// Request representing an *ExecRequest, *AttachRequest, or *PortForwardRequest Type.
type Request interface{}

type cacheEntry struct {
token string
req request
req Request
expireTime time.Time
}

func newRequestCache() *requestCache {
return &requestCache{
// NewRequestCache return a RequestCache
func NewRequestCache() *RequestCache {
return &RequestCache{
ll: list.New(),
tokens: make(map[string]*list.Element),
}
}

// Insert the given request into the cache and returns the token used for fetching it out.
func (c *requestCache) Insert(req request) (token string, err error) {
func (c *RequestCache) Insert(req Request) (token string, err error) {
c.lock.Lock()
defer c.lock.Unlock()

Expand All @@ -69,7 +70,7 @@ func (c *requestCache) Insert(req request) (token string, err error) {
}

// Consume the token (remove it from the cache) and return the cached request, if found.
func (c *requestCache) Consume(token string) (req request, found bool) {
func (c *RequestCache) Consume(token string) (req Request, found bool) {
c.lock.Lock()
defer c.lock.Unlock()
ele, ok := c.tokens[token]
Expand All @@ -88,7 +89,7 @@ func (c *requestCache) Consume(token string) (req request, found bool) {
}

// uniqueToken generates a random URL-safe token and ensures uniqueness.
func (c *requestCache) uniqueToken() (string, error) {
func (c *RequestCache) uniqueToken() (string, error) {
const maxTries = 10
// Number of bytes to be TokenLen when base64 encoded.
tokenSize := math.Ceil(float64(TokenLen) * 6 / 8)
Expand All @@ -108,7 +109,7 @@ func (c *requestCache) uniqueToken() (string, error) {
}

// Must be write-locked prior to calling.
func (c *requestCache) gc() {
func (c *RequestCache) gc() {
now := time.Now()
for c.ll.Len() > 0 {
oldest := c.ll.Back()
Expand Down
5 changes: 2 additions & 3 deletions cri/src/cri.go → cri/v1alpha1/cri.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"bytes"
Expand All @@ -12,7 +12,6 @@ import (
"time"

apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/errtypes"
Expand Down Expand Up @@ -87,7 +86,7 @@ type CriManager struct {
CniMgr CniMgr

// StreamServer is the stream server of CRI serves container streaming request.
StreamServer stream.Server
StreamServer Server

// SandboxBaseDir is the directory used to store sandbox files like /etc/hosts, /etc/resolv.conf, etc.
SandboxBaseDir string
Expand Down
9 changes: 5 additions & 4 deletions cri/src/cri_network.go → cri/v1alpha1/cri_network.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"fmt"
Expand Down Expand Up @@ -78,9 +78,6 @@ func (c *CniManager) Name() string {
// are launched.
func (c *CniManager) SetUpPodNetwork(podNetwork *ocicni.PodNetwork) error {
_, err := c.plugin.SetUpPod(*podNetwork)
if err != nil {
return fmt.Errorf("failed to setup network for sandbox %q: %v", podNetwork.ID, err)
}

defer func() {
if err != nil {
Expand All @@ -92,6 +89,10 @@ func (c *CniManager) SetUpPodNetwork(podNetwork *ocicni.PodNetwork) error {
}
}()

if err != nil {
return fmt.Errorf("failed to setup network for sandbox %q: %v", podNetwork.ID, err)
}

return nil
}

Expand Down
12 changes: 6 additions & 6 deletions cri/src/cri_stream.go → cri/v1alpha1/cri_stream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"bytes"
Expand All @@ -11,25 +11,24 @@ import (
"time"

apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/cri/stream/remotecommand"
"github.com/alibaba/pouch/daemon/mgr"

"github.com/sirupsen/logrus"
)

func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (stream.Server, error) {
config := stream.DefaultConfig
func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) {
config := DefaultConfig
config.Address = net.JoinHostPort(address, port)
runtime := newStreamRuntime(ctrMgr)
return stream.NewServer(config, runtime)
return NewServer(config, runtime)
}

type streamRuntime struct {
containerMgr mgr.ContainerMgr
}

func newStreamRuntime(ctrMgr mgr.ContainerMgr) stream.Runtime {
func newStreamRuntime(ctrMgr mgr.ContainerMgr) Runtime {
return &streamRuntime{containerMgr: ctrMgr}
}

Expand Down Expand Up @@ -61,6 +60,7 @@ func (s *streamRuntime) Exec(containerID string, cmd []string, streamOpts *remot
return 0, fmt.Errorf("failed to start exec for container %q: %v", containerID, err)
}

// TODO Find a better way instead of the dead loop
var ei *apitypes.ContainerExecInspect
for {
ei, err = s.containerMgr.InspectExec(ctx, execid)
Expand Down
2 changes: 1 addition & 1 deletion cri/src/cri_types.go → cri/v1alpha1/cri_types.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
Expand Down
2 changes: 1 addition & 1 deletion cri/src/cri_utils.go → cri/v1alpha1/cri_utils.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion cri/src/cri_wrapper.go → cri/v1alpha1/cri_wrapper.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"github.com/sirupsen/logrus"
Expand Down
7 changes: 4 additions & 3 deletions cri/stream/server.go → cri/v1alpha1/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package stream
package v1alpha1

import (
"io"
Expand All @@ -7,6 +7,7 @@ import (
"path"
"time"

"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/cri/stream/constant"
"github.com/alibaba/pouch/cri/stream/portforward"
"github.com/alibaba/pouch/cri/stream/remotecommand"
Expand Down Expand Up @@ -92,7 +93,7 @@ var DefaultConfig = Config{
type server struct {
config Config
runtime Runtime
cache *requestCache
cache *stream.RequestCache
server *http.Server
}

Expand All @@ -101,7 +102,7 @@ func NewServer(config Config, runtime Runtime) (Server, error) {
s := &server{
config: config,
runtime: runtime,
cache: newRequestCache(),
cache: stream.NewRequestCache(),
}

if s.config.BaseURL == nil {
Expand Down
2 changes: 1 addition & 1 deletion cri/service/cri.go → cri/v1alpha1/service/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"syscall"

cri "github.com/alibaba/pouch/cri/src"
cri "github.com/alibaba/pouch/cri/v1alpha1"
"github.com/alibaba/pouch/daemon/config"

"google.golang.org/grpc"
Expand Down
Loading

0 comments on commit 0df5157

Please sign in to comment.