Skip to content

Commit

Permalink
cri com and up
Browse files Browse the repository at this point in the history
  • Loading branch information
starnop committed May 16, 2018
1 parent 48575a9 commit 9339a70
Show file tree
Hide file tree
Showing 34 changed files with 32,649 additions and 96 deletions.
2 changes: 2 additions & 0 deletions cri/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ type Config struct {
NetworkPluginConfDir string
// SandboxImage is the image used by sandbox container.
SandboxImage string
// CriVersion is the cri version
CriVersion string
}
104 changes: 104 additions & 0 deletions cri/criservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package cri

import (
criv1alpha1 "github.com/alibaba/pouch/cri/v1alpha1"
servicev1alpha1 "github.com/alibaba/pouch/cri/v1alpha1/service"
criv1alpha2 "github.com/alibaba/pouch/cri/v1alpha2"
servicev1alpha2 "github.com/alibaba/pouch/cri/v1alpha2/service"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"

"github.com/sirupsen/logrus"
)

// RunCriService start cri service if pouchd is specified with --enable-cri.
func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, stopCh chan error) {
var err error

defer func() {
stopCh <- err
close(stopCh)
}()
if !daemonconfig.IsCriEnabled {
return
}
switch daemonconfig.CriConfig.CriVersion {
case "v1alpha1":
runv1alpha1(daemonconfig, containerMgr, imageMgr)
case "v1alpha2":
runv1alpha2(daemonconfig, containerMgr, imageMgr)
}
return
}

func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr) {
criMgr, err := criv1alpha1.NewCriManager(daemonconfig, containerMgr, imageMgr)
if err != nil {
return
}

service, err := servicev1alpha1.NewService(daemonconfig, criMgr)
if err != nil {
return
}

grpcServerCloseCh := make(chan struct{})
go func() {
if err := service.Serve(); err != nil {
logrus.Errorf("failed to start grpc server: %v", err)
}
close(grpcServerCloseCh)
}()

streamServerCloseCh := make(chan struct{})
go func() {
if err := criMgr.StreamServerStart(); err != nil {
logrus.Errorf("failed to start stream server: %v", err)
}
close(streamServerCloseCh)
}()

<-streamServerCloseCh
logrus.Infof("CRI Stream server stopped")
<-grpcServerCloseCh
logrus.Infof("CRI GRPC server stopped")

logrus.Infof("CRI service stopped")
return
}

func runv1alpha2(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr) {
criMgr, err := criv1alpha2.NewCriManager(daemonconfig, containerMgr, imageMgr)
if err != nil {
return
}

service, err := servicev1alpha2.NewService(daemonconfig, criMgr)
if err != nil {
return
}

grpcServerCloseCh := make(chan struct{})
go func() {
if err := service.Serve(); err != nil {
logrus.Errorf("failed to start grpc server: %v", err)
}
close(grpcServerCloseCh)
}()

streamServerCloseCh := make(chan struct{})
go func() {
if err := criMgr.StreamServerStart(); err != nil {
logrus.Errorf("failed to start stream server: %v", err)
}
close(streamServerCloseCh)
}()

<-streamServerCloseCh
logrus.Infof("CRI Stream server stopped")
<-grpcServerCloseCh
logrus.Infof("CRI GRPC server stopped")

logrus.Infof("CRI service stopped")
return
}
14 changes: 7 additions & 7 deletions cri/stream/request_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
// requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
// random token for their retrieval. The requestCache is used for building streaming URLs without
// the need to encode every request parameter in the URL.
type requestCache struct {
type RequestCache struct {
// tokens maps the generate token to the request for fast retrieval.
tokens map[string]*list.Element
// ll maintains an age-ordered request list for faster garbage collection of expired requests.
Expand All @@ -40,15 +40,15 @@ type cacheEntry struct {
expireTime time.Time
}

func newRequestCache() *requestCache {
return &requestCache{
func NewRequestCache() *RequestCache {
return &RequestCache{
ll: list.New(),
tokens: make(map[string]*list.Element),
}
}

// Insert the given request into the cache and returns the token used for fetching it out.
func (c *requestCache) Insert(req request) (token string, err error) {
func (c *RequestCache) Insert(req request) (token string, err error) {
c.lock.Lock()
defer c.lock.Unlock()

Expand All @@ -69,7 +69,7 @@ func (c *requestCache) Insert(req request) (token string, err error) {
}

// Consume the token (remove it from the cache) and return the cached request, if found.
func (c *requestCache) Consume(token string) (req request, found bool) {
func (c *RequestCache) Consume(token string) (req request, found bool) {
c.lock.Lock()
defer c.lock.Unlock()
ele, ok := c.tokens[token]
Expand All @@ -88,7 +88,7 @@ func (c *requestCache) Consume(token string) (req request, found bool) {
}

// uniqueToken generates a random URL-safe token and ensures uniqueness.
func (c *requestCache) uniqueToken() (string, error) {
func (c *RequestCache) uniqueToken() (string, error) {
const maxTries = 10
// Number of bytes to be TokenLen when base64 encoded.
tokenSize := math.Ceil(float64(TokenLen) * 6 / 8)
Expand All @@ -108,7 +108,7 @@ func (c *requestCache) uniqueToken() (string, error) {
}

// Must be write-locked prior to calling.
func (c *requestCache) gc() {
func (c *RequestCache) gc() {
now := time.Now()
for c.ll.Len() > 0 {
oldest := c.ll.Back()
Expand Down
5 changes: 2 additions & 3 deletions cri/src/cri.go → cri/v1alpha1/cri.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"bytes"
Expand All @@ -12,7 +12,6 @@ import (
"time"

apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/errtypes"
Expand Down Expand Up @@ -87,7 +86,7 @@ type CriManager struct {
CniMgr CniMgr

// StreamServer is the stream server of CRI serves container streaming request.
StreamServer stream.Server
StreamServer Server

// SandboxBaseDir is the directory used to store sandbox files like /etc/hosts, /etc/resolv.conf, etc.
SandboxBaseDir string
Expand Down
2 changes: 1 addition & 1 deletion cri/src/cri_network.go → cri/v1alpha1/cri_network.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"fmt"
Expand Down
11 changes: 5 additions & 6 deletions cri/src/cri_stream.go → cri/v1alpha1/cri_stream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"bytes"
Expand All @@ -11,25 +11,24 @@ import (
"time"

apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/cri/stream"
"github.com/alibaba/pouch/cri/stream/remotecommand"
"github.com/alibaba/pouch/daemon/mgr"

"github.com/sirupsen/logrus"
)

func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (stream.Server, error) {
config := stream.DefaultConfig
func newStreamServer(ctrMgr mgr.ContainerMgr, address string, port string) (Server, error) {
config := DefaultConfig
config.Address = net.JoinHostPort(address, port)
runtime := newStreamRuntime(ctrMgr)
return stream.NewServer(config, runtime)
return NewServer(config, runtime)
}

type streamRuntime struct {
containerMgr mgr.ContainerMgr
}

func newStreamRuntime(ctrMgr mgr.ContainerMgr) stream.Runtime {
func newStreamRuntime(ctrMgr mgr.ContainerMgr) Runtime {
return &streamRuntime{containerMgr: ctrMgr}
}

Expand Down
2 changes: 1 addition & 1 deletion cri/src/cri_types.go → cri/v1alpha1/cri_types.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
Expand Down
4 changes: 2 additions & 2 deletions cri/src/cri_utils.go → cri/v1alpha1/cri_utils.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"bytes"
Expand All @@ -14,8 +14,8 @@ import (
apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/utils"
"github.com/go-openapi/strfmt"

"github.com/go-openapi/strfmt"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
Expand Down
25 changes: 13 additions & 12 deletions cri/src/cri_utils_test.go → cri/v1alpha1/cri_utils_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"fmt"
Expand All @@ -8,6 +8,7 @@ import (

apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/daemon/mgr"

"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
Expand Down Expand Up @@ -281,7 +282,7 @@ func Test_makeSandboxPouchConfig(t *testing.T) {
want *apitypes.ContainerCreateConfig
wantErr bool
}{
// TODO: Add test cases.
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -341,7 +342,7 @@ func Test_toCriSandbox(t *testing.T) {
want *runtime.PodSandbox
wantErr bool
}{
// TODO: Add test cases.
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -557,7 +558,7 @@ func Test_makeContainerName(t *testing.T) {
args args
want string
}{
// TODO: Add test cases.
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -578,7 +579,7 @@ func Test_modifyContainerNamespaceOptions(t *testing.T) {
name string
args args
}{
// TODO: Add test cases.
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -599,7 +600,7 @@ func Test_applyContainerSecurityContext(t *testing.T) {
args args
wantErr bool
}{
// TODO: Add test cases.
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -627,7 +628,7 @@ func TestCriManager_updateCreateConfig(t *testing.T) {
args args
wantErr bool
}{
// TODO: Add test cases.
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -652,7 +653,7 @@ func Test_toCriContainer(t *testing.T) {
want *runtime.Container
wantErr bool
}{
// TODO: Add test cases.
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -679,7 +680,7 @@ func Test_imageToCriImage(t *testing.T) {
want *runtime.Image
wantErr bool
}{
// TODO: Add test cases.
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -710,7 +711,7 @@ func TestCriManager_ensureSandboxImageExists(t *testing.T) {
args args
wantErr bool
}{
// TODO: Add test cases.
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -735,7 +736,7 @@ func Test_getUserFromImageUser(t *testing.T) {
want *int64
want1 string
}{
// TODO: Add test cases.
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -759,7 +760,7 @@ func Test_parseUserFromImageUser(t *testing.T) {
args args
want string
}{
// TODO: Add test cases.
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cri/src/cri_wrapper.go → cri/v1alpha1/cri_wrapper.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src
package v1alpha1

import (
"github.com/sirupsen/logrus"
Expand Down
Loading

0 comments on commit 9339a70

Please sign in to comment.