Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: service group resource subscribe ration #6479

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@

"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon"
canalv1 "github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon/canal/v1"
"github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/util"
"github.com/erda-project/erda/pkg/http/httpclient"
"github.com/erda-project/erda/pkg/schedule/schedulepolicy/constraintbuilders"
"github.com/erda-project/erda/pkg/strutil"
)

type CanalOperator struct {
k8s addon.K8SUtil
ns addon.NamespaceUtil
secret addon.SecretUtil
pvc addon.PVCUtil
client *httpclient.HTTPClient
k8s addon.K8SUtil
ns addon.NamespaceUtil
overcommit addon.OverCommitUtil
secret addon.SecretUtil
pvc addon.PVCUtil
client *httpclient.HTTPClient
}

func (c *CanalOperator) Name(sg *apistructs.ServiceGroup) string {
Expand All @@ -58,13 +58,15 @@
return c.Namespace(sg) + "/" + c.Name(sg)
}

func New(k8s addon.K8SUtil, ns addon.NamespaceUtil, secret addon.SecretUtil, pvc addon.PVCUtil, client *httpclient.HTTPClient) *CanalOperator {
func New(k8s addon.K8SUtil, ns addon.NamespaceUtil, overcommit addon.OverCommitUtil,
secret addon.SecretUtil, pvc addon.PVCUtil, client *httpclient.HTTPClient) *CanalOperator {
return &CanalOperator{
k8s: k8s,
ns: ns,
secret: secret,
pvc: pvc,
client: client,
k8s: k8s,
ns: ns,
overcommit: overcommit,
secret: secret,
pvc: pvc,
client: client,
}
}

Expand Down Expand Up @@ -131,54 +133,13 @@
return nil
}

func (c *CanalOperator) Convert(sg *apistructs.ServiceGroup) interface{} {
func (c *CanalOperator) Convert(sg *apistructs.ServiceGroup) (any, error) {
canal := sg.Services[0]

scheinfo := sg.ScheduleInfo2
scheinfo.Stateful = true
affinity := constraintbuilders.K8S(&scheinfo, nil, nil, nil).Affinity

resources := corev1.ResourceRequirements{
Requests: corev1.ResourceList{},
Limits: corev1.ResourceList{},
}
adminResources := corev1.ResourceRequirements{
Requests: corev1.ResourceList{},
Limits: corev1.ResourceList{},
}
if canal.Resources.Cpu != 0 {
cpu := resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.Cpu*1000)), "m"))
resources.Requests[corev1.ResourceCPU] = cpu

// 1/4
cpu = resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.Cpu*1000/4)), "m"))
adminResources.Requests[corev1.ResourceCPU] = cpu
}
if canal.Resources.MaxCPU != 0 {
maxCpu := resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.MaxCPU*1000)), "m"))
resources.Limits[corev1.ResourceCPU] = maxCpu

// 1/2
maxCpu = resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.MaxCPU*1000/2)), "m"))
adminResources.Limits[corev1.ResourceCPU] = maxCpu
}
if canal.Resources.Mem != 0 {
mem := resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.Mem)), "Mi"))
resources.Requests[corev1.ResourceMemory] = mem

// 1/3
mem = resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.Mem/3)), "Mi"))
adminResources.Requests[corev1.ResourceMemory] = mem
}
if canal.Resources.MaxMem != 0 {
maxMem := resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.MaxMem)), "Mi"))
resources.Limits[corev1.ResourceMemory] = maxMem

// 2/3
maxMem = resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.MaxMem*2/3)), "Mi"))
adminResources.Limits[corev1.ResourceMemory] = maxMem
}

v := "v1.1.5"
if canal.Env["CANAL_VERSION"] != "" {
v = canal.Env["CANAL_VERSION"]
Expand Down Expand Up @@ -206,6 +167,19 @@
}
}

workspace, _ := util.GetDiceWorkspaceFromEnvs(canal.Env)
containerResources, err := c.overcommit.ResourceOverCommit(workspace, canal.Resources)
if err != nil {
return nil, fmt.Errorf("failed to calc container resources, err: %v", err)
}

Check warning on line 174 in internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon/canal/canal.go

View check run for this annotation

Codecov / codecov/patch

internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon/canal/canal.go#L173-L174

Added lines #L173 - L174 were not covered by tests
adminContainerResources, err := c.overcommit.ResourceOverCommit(workspace, apistructs.Resources{
Cpu: canal.Resources.Cpu / 3,
Mem: canal.Resources.Mem * 2 / 3,
})
if err != nil {
return nil, fmt.Errorf("failed to calc admin container resources, err: %v", err)
}

Check warning on line 181 in internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon/canal/canal.go

View check run for this annotation

Codecov / codecov/patch

internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon/canal/canal.go#L180-L181

Added lines #L180 - L181 were not covered by tests

obj := &canalv1.Canal{
TypeMeta: metav1.TypeMeta{
APIVersion: "database.erda.cloud/v1",
Expand All @@ -221,8 +195,8 @@
Replicas: canal.Scale,

Affinity: &affinity,
Resources: resources,
AdminResources: adminResources,
Resources: containerResources,
AdminResources: adminContainerResources,
Labels: make(map[string]string),
CanalOptions: canalOptions,
AdminOptions: adminOptions,
Expand All @@ -235,7 +209,7 @@

addon.SetAddonLabelsAndAnnotations(canal, obj.Spec.Labels, obj.Spec.Annotations)

return obj
return obj, nil
}

func (c *CanalOperator) Create(k8syml interface{}) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,83 +18,170 @@ import (
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon/sourcecov/mock"
"github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon/mock"
"github.com/erda-project/erda/pkg/http/httpclient"
)

type k8s struct{}
var sg = &apistructs.ServiceGroup{
Dice: apistructs.Dice{
ID: "mock-canal",
Labels: map[string]string{
"USE_OPERATOR": "canal",
},
Services: []apistructs.Service{
{
Name: "canal",
Resources: apistructs.Resources{
Cpu: 3,
Mem: 3072,
},
Scale: 2,
Env: map[string]string{
apistructs.DiceWorkspaceEnvKey: apistructs.WORKSPACE_DEV,
"CANAL_DESTINATION": "example",
"canal.instance.master.address": "mock-mysql.svc.cluster.local:3306",
"canal.instance.dbUsername": "erda",
"canal.instance.dbPassword": "password",
},
},
},
},
}

var sgCanalAdmin = &apistructs.ServiceGroup{
Dice: apistructs.Dice{
ID: "mock-canal",
Labels: map[string]string{
"USE_OPERATOR": "canal",
},
Services: []apistructs.Service{
{
Name: "canal",
Resources: apistructs.Resources{
Cpu: 1,
Mem: 2048,
},
Scale: 2,
Env: map[string]string{
apistructs.DiceWorkspaceEnvKey: apistructs.WORKSPACE_DEV,
"canal.admin.manager": "127.0.0.1:8089",
"spring.datasource.address": "mock-mysql.svc.cluster.local:3306",
"spring.datasource.username": "erda",
"spring.datasource.password": "",
},
},
},
},
}

func (k8s) GetK8SAddr() string {
return ""
var mockResourceRequirements = corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("3"),
corev1.ResourceMemory: resource.MustParse("3072Mi"),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("3"),
corev1.ResourceMemory: resource.MustParse("3072Mi"),
},
}

var mockAdminResourceRequirements = corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2048Mi"),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2048Mi"),
},
}

func TestCanalOperator(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ns := mock.NewMockNamespaceUtil(ctrl)
// Create mock
namespaceUtil := mock.NewMockNamespaceUtil(ctrl)
overCommitUtil := mock.NewMockOverCommitUtil(ctrl)
overCommitUtil.EXPECT().ResourceOverCommit(apistructs.DevWorkspace, sg.Services[0].Resources).
Return(mockResourceRequirements, nil).AnyTimes()
overCommitUtil.EXPECT().ResourceOverCommit(apistructs.DevWorkspace, sgCanalAdmin.Services[0].Resources).
Return(mockAdminResourceRequirements, nil).AnyTimes()

k8sUtil := mock.NewMockK8SUtil(ctrl)
k8sUtil.EXPECT().GetK8SAddr().Return("mock-k8s-addr").AnyTimes()

mo := New(k8sUtil, namespaceUtil, overCommitUtil, nil, nil, httpclient.New())

t.Run("Test Name and NamespacedName", func(t *testing.T) {
assert.NotPanics(t, func() { mo.Name(sg) })
assert.NotPanics(t, func() { mo.NamespacedName(sg) })
})

t.Run("Test IsSupported", func(t *testing.T) {
assert.NotPanics(t, func() { mo.IsSupported() })
})

t.Run("Test Validate", func(t *testing.T) {
assert.NotPanics(t, func() { mo.Validate(sg) })
})

mo := New(new(k8s), ns, nil, nil, httpclient.New())
sg := new(apistructs.ServiceGroup)
sg.Services = append(sg.Services, apistructs.Service{
Name: "canal",
t.Run("Test Convert", func(t *testing.T) {
_, err := mo.Convert(sg)
assert.NoError(t, err)
})

t.Run("Test CRUD Operations", func(t *testing.T) {
assert.NotPanics(t, func() { mo.Create(sg) })
assert.NotPanics(t, func() { mo.Inspect(sg) })
assert.NotPanics(t, func() { mo.Update(sg) })
assert.NotPanics(t, func() { mo.Remove(sg) })
})
sg.ID = "abcdefghigklmn"
mo.Name(sg)
mo.NamespacedName(sg)
mo.IsSupported()
mo.Validate(sg)
sg.Labels = make(map[string]string)
sg.Labels["USE_OPERATOR"] = "canal"
mo.Validate(sg)
sg.Services[0].Env = make(map[string]string)
mo.Validate(sg)
sg.Services[0].Env["CANAL_DESTINATION"] = "b"
sg.Services[0].Env["canal.instance.master.address"] = "1"
sg.Services[0].Env["canal.instance.master.address"] = "1"
sg.Services[0].Env["canal.instance.dbUsername"] = "2"
sg.Services[0].Env["canal.instance.dbPassword"] = "3"
mo.Validate(sg)
mo.Convert(sg)
mo.Create(sg)
mo.Inspect(sg)
mo.Update(sg)
mo.Remove(sg)
}

func TestCanalOperator2(t *testing.T) {
func TestCanalOperatorAdmin(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ns := mock.NewMockNamespaceUtil(ctrl)
// Create mock
namespaceUtil := mock.NewMockNamespaceUtil(ctrl)
overCommitUtil := mock.NewMockOverCommitUtil(ctrl)
overCommitUtil.EXPECT().ResourceOverCommit(apistructs.DevWorkspace, sg.Services[0].Resources).
Return(mockResourceRequirements, nil).AnyTimes()
overCommitUtil.EXPECT().ResourceOverCommit(apistructs.DevWorkspace, sgCanalAdmin.Services[0].Resources).
Return(mockAdminResourceRequirements, nil).AnyTimes()

k8sUtil := mock.NewMockK8SUtil(ctrl)
k8sUtil.EXPECT().GetK8SAddr().Return("mock-k8s-addr").AnyTimes()

mo := New(k8sUtil, namespaceUtil, overCommitUtil, nil, nil, httpclient.New())

t.Run("Test Name and NamespacedName", func(t *testing.T) {
assert.NotPanics(t, func() { mo.Name(sg) })
assert.NotPanics(t, func() { mo.NamespacedName(sg) })
})

t.Run("Test IsSupported", func(t *testing.T) {
assert.NotPanics(t, func() { mo.IsSupported() })
})

t.Run("Test Validate", func(t *testing.T) {
assert.NotPanics(t, func() { mo.Validate(sg) })
})

t.Run("Test Convert", func(t *testing.T) {
assert.NotPanics(t, func() { mo.Convert(sg) })
})

mo := New(new(k8s), ns, nil, nil, httpclient.New())
sg := new(apistructs.ServiceGroup)
sg.Services = append(sg.Services, apistructs.Service{
Name: "canal",
t.Run("Test CRUD Operations", func(t *testing.T) {
assert.NotPanics(t, func() { mo.Create(sg) })
assert.NotPanics(t, func() { mo.Inspect(sg) })
assert.NotPanics(t, func() { mo.Update(sg) })
assert.NotPanics(t, func() { mo.Remove(sg) })
})
sg.ID = "abcdefghigklmn"
mo.Name(sg)
mo.NamespacedName(sg)
mo.IsSupported()
mo.Validate(sg)
sg.Labels = make(map[string]string)
sg.Labels["USE_OPERATOR"] = "canal"
mo.Validate(sg)
sg.Services[0].Env = make(map[string]string)
mo.Validate(sg)
sg.Services[0].Env["CANAL_DESTINATION"] = "b"
sg.Services[0].Env["canal.admin.manager"] = "127.0.0.1:8089"
sg.Services[0].Env["spring.datasource.address"] = "1"
sg.Services[0].Env["spring.datasource.address"] = "1"
sg.Services[0].Env["spring.datasource.username"] = "2"
sg.Services[0].Env["spring.datasource.password"] = "3"
mo.Validate(sg)
mo.Convert(sg)
mo.Create(sg)
mo.Inspect(sg)
mo.Update(sg)
mo.Remove(sg)
}
Loading
Loading