Skip to content

Commit

Permalink
refactor(orm): make the code more logical
Browse files Browse the repository at this point in the history
Signed-off-by: Airren <[email protected]>
  • Loading branch information
Airren committed Jul 2, 2024
1 parent 83f23aa commit 0806474
Show file tree
Hide file tree
Showing 8 changed files with 532 additions and 446 deletions.
7 changes: 4 additions & 3 deletions cmd/katalyst-agent/app/options/orm/orm_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
cliflag "k8s.io/component-base/cli/flag"

ormconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/orm"
"github.com/kubewharf/katalyst-core/pkg/consts"
)

type GenericORMPluginOptions struct {
ORMWorkMode string
ORMWorkMode consts.WorkMode
ORMReconcilePeriod time.Duration
ORMResourceNamesMap map[string]string
ORMPodNotifyChanLen int
Expand All @@ -42,7 +43,7 @@ type GenericORMPluginOptions struct {

func NewGenericORMPluginOptions() *GenericORMPluginOptions {
return &GenericORMPluginOptions{
ORMWorkMode: "bypass",
ORMWorkMode: consts.WorkModeBypass,
ORMReconcilePeriod: time.Second * 5,
ORMResourceNamesMap: map[string]string{},
ORMPodNotifyChanLen: 10,
Expand All @@ -61,7 +62,7 @@ func NewGenericORMPluginOptions() *GenericORMPluginOptions {
func (o *GenericORMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs := fss.FlagSet("orm")

fs.StringVar(&o.ORMWorkMode, "orm-work-mode", o.ORMWorkMode, "orm work mode, nri or bypass")
fs.StringVar((*string)(&o.ORMWorkMode), "orm-work-mode", string(o.ORMWorkMode), "orm work mode, nri or bypass")
fs.DurationVar(&o.ORMReconcilePeriod, "orm-reconcile-period",
o.ORMReconcilePeriod, "orm resource reconcile period")
fs.StringToStringVar(&o.ORMResourceNamesMap, "orm-resource-names-map", o.ORMResourceNamesMap,
Expand Down
184 changes: 12 additions & 172 deletions pkg/agent/orm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ import (
"sync"
"time"

"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/stub"
"github.com/opencontainers/selinux/go-selinux"
"google.golang.org/grpc"
"gopkg.in/yaml.v3"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
Expand All @@ -46,9 +44,9 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/orm/server"
"github.com/kubewharf/katalyst-core/pkg/agent/orm/server/podresources"
"github.com/kubewharf/katalyst-core/pkg/agent/orm/topology"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
metaserverpod "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
"github.com/kubewharf/katalyst-core/pkg/metrics"
Expand All @@ -58,14 +56,10 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

type nriConfig struct {
Events []string `json:"events"`
}

type ManagerImpl struct {
ctx context.Context

mode workMode
mode consts.WorkMode

socketname string
socketdir string
Expand Down Expand Up @@ -122,7 +116,7 @@ func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *me
}

m := &ManagerImpl{
mode: workMode(config.ORMWorkMode),
mode: config.ORMWorkMode,
socketdir: dir,
socketname: file,

Expand Down Expand Up @@ -164,42 +158,17 @@ func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *me
}

func (m *ManagerImpl) initORMWorkMode(config *config.Configuration) {
if !m.validateNRIMode(config) {
m.mode = workModeBypass
if m.validateNRIMode(config) {
m.mode = consts.WorkModeNri
klog.Infof("[ORM] init ORM work mode with nri mode")
} else {
m.mode = consts.WorkModeBypass
klog.Infof("[ORM] init ORM work mode with bypass mode")
m.resourceExecutor = executor.NewExecutor(cgroupmgr.GetManager())
return
}
klog.Infof("[ORM] init ORM work mode with nri mode")
return
}

func (m *ManagerImpl) validateNRIMode(config *config.Configuration) bool {
var err error
if config.ORMWorkMode != string(workModeNri) {
return false
}
if _, err := os.Stat(config.ORMNRISocketPath); os.IsNotExist(err) {
klog.Errorf("[ORM] nri socket path %q does not exist", config.ORMNRISocketPath)
return false
}
var opts []stub.Option
opts = append(opts, stub.WithPluginName(config.ORMNRIPluginName))
opts = append(opts, stub.WithPluginIdx(config.ORMNRIPluginIndex))
opts = append(opts, stub.WithSocketPath(config.ORMNRISocketPath))
m.nriOptions = opts

if m.nriMask, err = api.ParseEventMask(config.ORMNRIHandleEvents); err != nil {
klog.Errorf("[ORM] parse nri handle events fail: %v", err)
return false
}
if m.nriStub, err = stub.New(m, append(opts, stub.WithOnClose(m.onClose))...); err != nil {
klog.Errorf("[ORM] create nri stub fail: %v", err)
return false
}
return true
}

func (m *ManagerImpl) Run(ctx context.Context) {
klog.V(2).Infof("[ORM] running...")
m.ctx = ctx
Expand Down Expand Up @@ -248,7 +217,7 @@ func (m *ManagerImpl) Run(ctx context.Context) {

klog.V(5).Infof("[ORM] start serve socketPath %v", socketPath)

if m.mode == workModeBypass {
if m.mode == consts.WorkModeBypass {
go func() {
m.process()
}()
Expand Down Expand Up @@ -370,7 +339,7 @@ func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {

// allocate resources for current pod, return after resource allocate when run in NRIMode
err := m.addContainer(pod, container)
if err != nil || m.mode == workModeNri {
if err != nil || m.mode == consts.WorkModeNri {
return err
}

Expand Down Expand Up @@ -452,7 +421,7 @@ func (m *ManagerImpl) processAddPod(podUID string) error {
pod *v1.Pod
err error
)
if m.mode == workModeNri {
if m.mode == consts.WorkModeNri {
nriQueryCtx := context.WithValue(m.ctx, metaserverpod.BypassCacheKey, metaserverpod.BypassCacheTrue)
pod, err = m.metaManager.GetPod(nriQueryCtx, podUID)
} else {
Expand Down Expand Up @@ -683,7 +652,7 @@ func (m *ManagerImpl) reconcile() {
}
}

if m.mode == workModeNri {
if m.mode == consts.WorkModeNri {
containerId, err := native.GetContainerID(pod, container.Name)
if err != nil {
klog.Errorf("[ORM] pod: %s/%s/%s, container: %s, get container id fail: %v",
Expand Down Expand Up @@ -762,135 +731,6 @@ func (m *ManagerImpl) IsContainerRequestResource(container *v1.Container, resour
return false, nil
}

// ************************************NRI Plugin Interface implement **************************************************

func (m *ManagerImpl) Configure(_ context.Context, config, runtime, version string) (stub.EventMask, error) {
klog.V(4).Infof("got configuration data: %q from runtime %s %s", config, runtime, version)
if config == "" {
return m.nriMask, nil
}

err := yaml.Unmarshal([]byte(config), &m.nriConf)
if err != nil {
return 0, fmt.Errorf("failed to parse provided configuration: %w", err)
}

m.nriMask, err = api.ParseEventMask(m.nriConf.Events...)
if err != nil {
return 0, fmt.Errorf("failed to parse events in configuration: %w", err)
}

klog.V(6).Infof("handle NRI Configure successfully, config %s, runtime %s, version %s",
config, runtime, version)
return m.nriMask, nil
}

func (m *ManagerImpl) Synchronize(_ context.Context, pods []*api.PodSandbox, containers []*api.Container) (
[]*api.ContainerUpdate, error,
) {
// todo: update existed containers resources if orm stared after the Pod create events
return nil, nil
}

func (m *ManagerImpl) RunPodSandbox(_ context.Context, pod *api.PodSandbox) error {
klog.Infof("[ORM] RunPodSandbox, pod: %s/%s/%s", pod.Namespace, pod.Name, pod.Uid)
klog.V(6).Infof("[ORM] RunPodSandbox, pod annotations: %v", pod.Annotations)
err := m.processAddPod(pod.Uid)
if err != nil {
klog.Errorf("[ORM] RunPodSandbox processAddPod fail, pod: %s/%s/%s, err: %v",
pod.Namespace, pod.Name, pod.Uid, err)
}
return err
}

func (m *ManagerImpl) CreateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) (
*api.ContainerAdjustment, []*api.ContainerUpdate, error,
) {
klog.Infof("[ORM] CreateContainer, pod: %s/%s/%s, container: %v", pod.Namespace, pod.Name, pod.Uid, container.Name)
containerAllResources := m.podResources.containerAllResources(pod.Uid, container.Name)
if containerAllResources == nil {
klog.V(5).Infof("[ORM] CreateContainer process failed, pod: %s/%s/%s, container: %v, resources nil",
pod.Namespace, pod.Name, pod.Uid, container.Name)
return nil, nil, nil
}

adjust := &api.ContainerAdjustment{}
for _, resourceAllocationInfo := range containerAllResources {
switch resourceAllocationInfo.OciPropertyName {
case util.OCIPropertyNameCPUSetCPUs:
if resourceAllocationInfo.AllocationResult != "" {
adjust.SetLinuxCPUSetCPUs(resourceAllocationInfo.AllocationResult)
}
case util.OCIPropertyNameCPUSetMems:
if resourceAllocationInfo.AllocationResult != "" {
adjust.SetLinuxCPUSetMems(resourceAllocationInfo.AllocationResult)
}
}
}
klog.V(5).Infof("[ORM] handle NRI CreateContainer successfully, pod: %s/%s/%s, container: %s, adjust: %v",
pod.Namespace, pod.Name, pod.Uid, container.Name, adjust)
return adjust, nil, nil
}

func (m *ManagerImpl) UpdateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container, r *api.LinuxResources,
) ([]*api.ContainerUpdate, error) {
// todo: hook this method to update container resources
return nil, nil
// containerUpdate := m.getNRIContainerUpdate(pod.Uid, container.Id, container.Name)
// klog.V(5).Infof("[ORM] handle NRI UpdateContainer successfully, pod: %s/%s/%s, container: %s, update: %v",
// pod.Namespace, pod.Name, pod.Uid, container.Name, containerUpdate)
// return []*api.ContainerUpdate{containerUpdate}, nil
}

func (m *ManagerImpl) RemovePodSandbox(_ context.Context, pod *api.PodSandbox) error {
klog.Infof("[ORM] RemovePodSandbox, pod: %s/%s/%s", pod.Namespace, pod.Name, pod.Uid)
err := m.processDeletePod(pod.Uid)
if err != nil {
klog.Errorf("[ORM] RemovePodSandbox processDeletePod fail, pod: %s/%s/%s, err: %v",
pod.Namespace, pod.Name, pod.Uid, err)
}
return err
}

func (m *ManagerImpl) onClose() {
m.nriStub.Stop()
klog.V(6).Infof("NRI server closes")
}

func (m *ManagerImpl) updateContainerByNRI(podUID, containerId, containerName string) {
klog.V(2).Infof("[ORM] updateContainerByNRI, pod: %v, container: %v", podUID, containerName)
containerUpdate := m.getNRIContainerUpdate(podUID, containerId, containerName)
_, err := m.nriStub.UpdateContainers([]*api.ContainerUpdate{containerUpdate})
if err != nil {
klog.Errorf("[ORM] updateContainerByNRI fail, pod %v container %v,resource %v, err: %v", podUID, containerName, err)
}
}

func (m *ManagerImpl) getNRIContainerUpdate(podUID, containerId, containerName string) *api.ContainerUpdate {
containerUpdate := &api.ContainerUpdate{
ContainerId: containerId,
Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{Cpus: "", Mems: ""}}},
}
containerAllResources := m.podResources.containerAllResources(podUID, containerName)
for _, resourceAllocationInfo := range containerAllResources {
switch resourceAllocationInfo.OciPropertyName {
case util.OCIPropertyNameCPUSetCPUs:
if resourceAllocationInfo.AllocationResult != "" {
containerUpdate.Linux.Resources.Cpu.Cpus = resourceAllocationInfo.AllocationResult
}
case util.OCIPropertyNameCPUSetMems:
if resourceAllocationInfo.AllocationResult != "" {
containerUpdate.Linux.Resources.Cpu.Mems = resourceAllocationInfo.AllocationResult
}
default:

}
}
return containerUpdate
}

// *********************************************************************************************************************

func GetContainerTypeAndIndex(pod *v1.Pod, container *v1.Container) (containerType pluginapi.ContainerType, containerIndex uint64, err error) {
if pod == nil || container == nil {
err = fmt.Errorf("got nil pod: %v or container: %v", pod, container)
Expand Down
Loading

0 comments on commit 0806474

Please sign in to comment.