Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Update CRI from v1alpha1 to v1alpha2 & support down-level compatibility #1359

Merged
merged 1 commit into from
May 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cri/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ type Config struct {
NetworkPluginConfDir string
// SandboxImage is the image used by sandbox container.
SandboxImage string
// CriVersion is the cri version
CriVersion string
}
114 changes: 114 additions & 0 deletions cri/criservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package cri

import (
"fmt"

criv1alpha1 "github.com/alibaba/pouch/cri/v1alpha1"
servicev1alpha1 "github.com/alibaba/pouch/cri/v1alpha1/service"
criv1alpha2 "github.com/alibaba/pouch/cri/v1alpha2"
servicev1alpha2 "github.com/alibaba/pouch/cri/v1alpha2/service"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"

"github.com/sirupsen/logrus"
)

// RunCriService start cri service if pouchd is specified with --enable-cri.
func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, stopCh chan error) {
var err error

defer func() {
stopCh <- err
close(stopCh)
}()
if !daemonconfig.IsCriEnabled {
return
}
switch daemonconfig.CriConfig.CriVersion {
case "v1alpha1":
err = runv1alpha1(daemonconfig, containerMgr, imageMgr)
case "v1alpha2":
err = runv1alpha2(daemonconfig, containerMgr, imageMgr)
default:
err = fmt.Errorf("invalid CRI version,failed to start CRI service")
}
return
}

// Start CRI service with CRI version: v1alpha1
func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr) error {
logrus.Infof("Start CRI service with CRI version: v1alpha1")
criMgr, err := criv1alpha1.NewCriManager(daemonconfig, containerMgr, imageMgr)
if err != nil {
return fmt.Errorf("failed to get CriManager with error: %v", err)
}

service, err := servicev1alpha1.NewService(daemonconfig, criMgr)
if err != nil {
return fmt.Errorf("failed to start CRI service with error: %v", err)
}

// TODO: Stop the whole CRI service if any of the critical service exits
grpcServerCloseCh := make(chan struct{})
go func() {
if err := service.Serve(); err != nil {
logrus.Errorf("failed to start grpc server: %v", err)
}
close(grpcServerCloseCh)
}()

streamServerCloseCh := make(chan struct{})
go func() {
if err := criMgr.StreamServerStart(); err != nil {
logrus.Errorf("failed to start stream server: %v", err)
}
close(streamServerCloseCh)
}()

// TODO: refactor it with select
<-streamServerCloseCh
logrus.Infof("CRI Stream server stopped")
<-grpcServerCloseCh
logrus.Infof("CRI GRPC server stopped")

logrus.Infof("CRI service stopped")
return nil
}

// Start CRI service with CRI version: v1alpha2
func runv1alpha2(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr) error {
logrus.Infof("Start CRI service with CRI version: v1alpha2")
criMgr, err := criv1alpha2.NewCriManager(daemonconfig, containerMgr, imageMgr)
if err != nil {
return fmt.Errorf("failed to get CriManager with error: %v", err)
}

service, err := servicev1alpha2.NewService(daemonconfig, criMgr)
if err != nil {
return fmt.Errorf("failed to start CRI service with error: %v", err)
}
// TODO: Stop the whole CRI service if any of the critical service exits
grpcServerCloseCh := make(chan struct{})
go func() {
if err := service.Serve(); err != nil {
logrus.Errorf("failed to start grpc server: %v", err)
}
close(grpcServerCloseCh)
}()

streamServerCloseCh := make(chan struct{})
go func() {
if err := criMgr.StreamServerStart(); err != nil {
logrus.Errorf("failed to start stream server: %v", err)
}
close(streamServerCloseCh)
}()
// TODO: refactor it with select
<-streamServerCloseCh
logrus.Infof("CRI Stream server stopped")
<-grpcServerCloseCh
logrus.Infof("CRI GRPC server stopped")

logrus.Infof("CRI service stopped")
return nil
}
23 changes: 12 additions & 11 deletions cri/stream/request_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ var (
TokenLen = 8
)

// requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
// RequestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
// random token for their retrieval. The requestCache is used for building streaming URLs without
// the need to encode every request parameter in the URL.
type requestCache struct {
type RequestCache struct {
// tokens maps the generate token to the request for fast retrieval.
tokens map[string]*list.Element
// ll maintains an age-ordered request list for faster garbage collection of expired requests.
Expand All @@ -31,24 +31,25 @@ type requestCache struct {
lock sync.Mutex
}

// Type representing an *ExecRequest, *AttachRequest, or *PortForwardRequest.
type request interface{}
// Request representing an *ExecRequest, *AttachRequest, or *PortForwardRequest Type.
type Request interface{}

type cacheEntry struct {
token string
req request
req Request
expireTime time.Time
}

func newRequestCache() *requestCache {
return &requestCache{
// NewRequestCache return a RequestCache
func NewRequestCache() *RequestCache {
return &RequestCache{
ll: list.New(),
tokens: make(map[string]*list.Element),
}
}

// Insert the given request into the cache and returns the token used for fetching it out.
func (c *requestCache) Insert(req request) (token string, err error) {
func (c *RequestCache) Insert(req Request) (token string, err error) {
c.lock.Lock()
defer c.lock.Unlock()

Expand All @@ -69,7 +70,7 @@ func (c *requestCache) Insert(req request) (token string, err error) {
}

// Consume the token (remove it from the cache) and return the cached request, if found.
func (c *requestCache) Consume(token string) (req request, found bool) {
func (c *RequestCache) Consume(token string) (req Request, found bool) {
c.lock.Lock()
defer c.lock.Unlock()
ele, ok := c.tokens[token]
Expand All @@ -88,7 +89,7 @@ func (c *requestCache) Consume(token string) (req request, found bool) {
}

// uniqueToken generates a random URL-safe token and ensures uniqueness.
func (c *requestCache) uniqueToken() (string, error) {
func (c *RequestCache) uniqueToken() (string, error) {
const maxTries = 10
// Number of bytes to be TokenLen when base64 encoded.
tokenSize := math.Ceil(float64(TokenLen) * 6 / 8)
Expand All @@ -108,7 +109,7 @@ func (c *requestCache) uniqueToken() (string, error) {
}

// Must be write-locked prior to calling.
func (c *requestCache) gc() {
func (c *RequestCache) gc() {
now := time.Now()
for c.ll.Len() > 0 {
oldest := c.ll.Back()
Expand Down
5 changes: 2 additions & 3 deletions cri/src/cri.go → cri/v1alpha1/cri.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"bytes"
Expand All @@ -12,7 +12,6 @@ import (
"time"

apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/errtypes"
Expand Down Expand Up @@ -87,7 +86,7 @@ type CriManager struct {
CniMgr CniMgr

// StreamServer is the stream server of CRI serves container streaming request.
StreamServer stream.Server
StreamServer Server

// SandboxBaseDir is the directory used to store sandbox files like /etc/hosts, /etc/resolv.conf, etc.
SandboxBaseDir string
Expand Down
9 changes: 5 additions & 4 deletions cri/src/cri_network.go → cri/v1alpha1/cri_network.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"fmt"
Expand Down Expand Up @@ -78,9 +78,6 @@ func (c *CniManager) Name() string {
// are launched.
func (c *CniManager) SetUpPodNetwork(podNetwork *ocicni.PodNetwork) error {
_, err := c.plugin.SetUpPod(*podNetwork)
if err != nil {
return fmt.Errorf("failed to setup network for sandbox %q: %v", podNetwork.ID, err)
}

defer func() {
if err != nil {
Expand All @@ -92,6 +89,10 @@ func (c *CniManager) SetUpPodNetwork(podNetwork *ocicni.PodNetwork) error {
}
}()

if err != nil {
return fmt.Errorf("failed to setup network for sandbox %q: %v", podNetwork.ID, err)
}

return nil
}

Expand Down
12 changes: 6 additions & 6 deletions cri/src/cri_stream.go → cri/v1alpha1/cri_stream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"bytes"
Expand All @@ -11,25 +11,24 @@ import (
"time"

apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/cri/stream/remotecommand"
"github.com/alibaba/pouch/daemon/mgr"

"github.com/sirupsen/logrus"
)

func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (stream.Server, error) {
config := stream.DefaultConfig
func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) {
config := DefaultConfig
config.Address = net.JoinHostPort(address, port)
runtime := newStreamRuntime(ctrMgr)
return stream.NewServer(config, runtime)
return NewServer(config, runtime)
}

type streamRuntime struct {
containerMgr mgr.ContainerMgr
}

func newStreamRuntime(ctrMgr mgr.ContainerMgr) stream.Runtime {
func newStreamRuntime(ctrMgr mgr.ContainerMgr) Runtime {
return &streamRuntime{containerMgr: ctrMgr}
}

Expand Down Expand Up @@ -61,6 +60,7 @@ func (s *streamRuntime) Exec(containerID string, cmd []string, streamOpts *remot
return 0, fmt.Errorf("failed to start exec for container %q: %v", containerID, err)
}

// TODO Find a better way instead of the dead loop
var ei *apitypes.ContainerExecInspect
for {
ei, err = s.containerMgr.InspectExec(ctx, execid)
Expand Down
2 changes: 1 addition & 1 deletion cri/src/cri_types.go → cri/v1alpha1/cri_types.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
Expand Down
2 changes: 1 addition & 1 deletion cri/src/cri_utils.go → cri/v1alpha1/cri_utils.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion cri/src/cri_wrapper.go → cri/v1alpha1/cri_wrapper.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"github.com/sirupsen/logrus"
Expand Down
7 changes: 4 additions & 3 deletions cri/stream/server.go → cri/v1alpha1/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package stream
package v1alpha1

import (
"io"
Expand All @@ -7,6 +7,7 @@ import (
"path"
"time"

"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/cri/stream/constant"
"github.com/alibaba/pouch/cri/stream/portforward"
"github.com/alibaba/pouch/cri/stream/remotecommand"
Expand Down Expand Up @@ -92,7 +93,7 @@ var DefaultConfig = Config{
type server struct {
config Config
runtime Runtime
cache *requestCache
cache *stream.RequestCache
server *http.Server
}

Expand All @@ -101,7 +102,7 @@ func NewServer(config Config, runtime Runtime) (Server, error) {
s := &server{
config: config,
runtime: runtime,
cache: newRequestCache(),
cache: stream.NewRequestCache(),
}

if s.config.BaseURL == nil {
Expand Down
2 changes: 1 addition & 1 deletion cri/service/cri.go → cri/v1alpha1/service/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"syscall"

cri "github.com/alibaba/pouch/cri/src"
cri "github.com/alibaba/pouch/cri/v1alpha1"
"github.com/alibaba/pouch/daemon/config"

"google.golang.org/grpc"
Expand Down
Loading