Skip to content

Commit

Permalink
Review remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed May 7, 2024
1 parent 44330e3 commit 9a3da04
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 19 deletions.
7 changes: 6 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/loop"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"

Expand Down Expand Up @@ -502,7 +503,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)

opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()})
client, err := provreqclient.NewProvisioningRequestClient(restConfig)
if err != nil {
return nil, err
}
provreqProcesor := provreq.NewCombinedProvReqProcessor(client, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor(client)})
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
)

// ProvisioningRequestProcessor process ProvisioningRequests in the cluster.
type ProvisioningRequestProcessor interface {
Process(*provreqclient.ProvisioningRequestClient, []*provreqwrapper.ProvisioningRequest)
Process([]*provreqwrapper.ProvisioningRequest)
CleanUp()
}

Expand All @@ -38,12 +37,8 @@ type CombinedProvReqProcessor struct {
}

// NewCombinedProvReqProcessor return new CombinedProvReqProcessor.
func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []ProvisioningRequestProcessor) (loopstart.Observer, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &CombinedProvReqProcessor{client: client, processors: processors}, nil
func NewCombinedProvReqProcessor(client *provreqclient.ProvisioningRequestClient, processors []ProvisioningRequestProcessor) loopstart.Observer {
return &CombinedProvReqProcessor{client: client, processors: processors}
}

// Refresh iterates over ProvisioningRequests and updates its conditions/state.
Expand All @@ -54,7 +49,7 @@ func (cp *CombinedProvReqProcessor) Refresh() {
return
}
for _, p := range cp.processors {
p.Process(cp.client, provReqs)
p.Process(provReqs)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,19 @@ const (
type checkCapacityProcessor struct {
now func() time.Time
maxUpdated int
client *provreqclient.ProvisioningRequestClient
}

// NewCheckCapacityProcessor return ProvisioningRequestProcessor for Check-capacity ProvisioningClass.
func NewCheckCapacityProcessor() *checkCapacityProcessor {
return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated}
func NewCheckCapacityProcessor(client *provreqclient.ProvisioningRequestClient) *checkCapacityProcessor {
return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client}
}

// Process iterates over ProvisioningRequests and apply:
// -BookingExpired condition for Provisioned ProvisioningRequest if capacity reservation time is expired.
// -Failed condition for ProvisioningRequest that were not provisioned during defaultExpirationTime.
// TODO(yaroslava): fetch reservation and expiration time from ProvisioningRequest
func (p *checkCapacityProcessor) Process(client *provreqclient.ProvisioningRequestClient, provReqs []*provreqwrapper.ProvisioningRequest) {
func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) {
expiredProvReq := []*provreqwrapper.ProvisioningRequest{}
failedProvReq := []*provreqwrapper.ProvisioningRequest{}
for _, provReq := range provReqs {
Expand Down Expand Up @@ -79,7 +80,7 @@ func (p *checkCapacityProcessor) Process(client *provreqclient.ProvisioningReque
break
}
conditions.AddOrUpdateCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, conditions.CapacityReservationTimeExpiredReason, conditions.CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now()))
_, updErr := client.UpdateProvisioningRequest(provReq.ProvisioningRequest)
_, updErr := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest)
if updErr != nil {
klog.Errorf("failed to add BookingExpired condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr)
continue
Expand All @@ -91,7 +92,7 @@ func (p *checkCapacityProcessor) Process(client *provreqclient.ProvisioningReque
break
}
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.ExpiredReason, conditions.ExpiredMsg, metav1.NewTime(p.now()))
_, updErr := client.UpdateProvisioningRequest(provReq.ProvisioningRequest)
_, updErr := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest)
if updErr != nil {
klog.Errorf("failed to add Failed condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func TestProcess(t *testing.T) {
additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional")
additionalPr.CreationTimestamp = metav1.NewTime(weekAgo)
additionalPr.Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity
processor := checkCapacityProcessor{func() time.Time { return now }, 1}
processor.Process(provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), []*provreqwrapper.ProvisioningRequest{pr, additionalPr})
processor := checkCapacityProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr)}
processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})
assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions)
if len(test.conditions) == len(test.wantConditions) {
assert.ElementsMatch(t, []metav1.Condition{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package checkcapacity

import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -31,7 +33,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/klog/v2"

ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (o *checkCapacityProvClass) checkcapacity(unschedulablePods []*apiv1.Pod, p
}
_, updErr := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest)
if updErr != nil {
klog.Errorf("failed to update Provisioned condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err)
return false, fmt.Errorf("failed to update Provisioned condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr)
}
return capacityAvailable, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func (c *ProvisioningRequestClient) UpdateProvisioningRequest(pr *v1beta1.Provis
ctx, cancel := context.WithTimeout(context.Background(), provisioningRequestClientCallTimeout)
defer cancel()

// UpdateStatus API call on a copy of the PR with cleared Spec field ensures that
// the default null template.metadata.creationTimestamp field of PodTemplateSpec
// will not generate false error logs as a side effect.
prCopy := pr.DeepCopy()
prCopy.Spec = v1beta1.ProvisioningRequestSpec{}
updatedPr, err := c.client.AutoscalingV1beta1().ProvisioningRequests(prCopy.Namespace).UpdateStatus(ctx, prCopy, metav1.UpdateOptions{})
Expand Down

0 comments on commit 9a3da04

Please sign in to comment.