Skip to content

Commit

Permalink
chore: migrate knative logger to controller-runtime injected logger (#…
Browse files Browse the repository at this point in the history
…1050)

Signed-off-by: Feruzjon Muyassarov <[email protected]>
Co-authored-by: Jonathan Innis <[email protected]>
  • Loading branch information
fmuyassarov and jonathan-innis authored May 21, 2024
1 parent ddc8970 commit 9b145a6
Show file tree
Hide file tree
Showing 69 changed files with 305 additions and 250 deletions.
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/awslabs/operatorpkg v0.0.0-20240514175841-edb8fe5824b4
github.com/docker/docker v26.1.3+incompatible
github.com/go-logr/logr v1.4.1
github.com/go-logr/zapr v1.3.0
github.com/imdario/mergo v0.3.16
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/onsi/ginkgo/v2 v2.17.3
Expand All @@ -35,8 +34,6 @@ require (
sigs.k8s.io/controller-runtime v0.18.2
)

require github.com/go-task/slim-sprig/v3 v3.0.0 // indirect

require (
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
Expand All @@ -52,9 +49,11 @@ require (
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/zapr v1.3.0
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/gobuffalo/flect v0.2.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand Down
4 changes: 2 additions & 2 deletions kwok/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package main

import (
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/log"

kwok "sigs.k8s.io/karpenter/kwok/cloudprovider"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
Expand All @@ -41,7 +41,7 @@ func main() {
ctx, op := operator.NewOperator()
instanceTypes, err := kwok.ConstructInstanceTypes()
if err != nil {
logging.FromContext(ctx).Fatalf("failed to construct instance types: %s", err)
log.FromContext(ctx).Error(err, "failed constructing instance types")
}

cloudProvider := kwok.NewCloudProvider(ctx, op.GetClient(), instanceTypes)
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/v1beta1/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
. "knative.dev/pkg/logging/testing"

. "sigs.k8s.io/karpenter/pkg/utils/testing"

"sigs.k8s.io/karpenter/pkg/apis"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
Expand Down
18 changes: 9 additions & 9 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package disruption
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -27,13 +28,11 @@ import (
"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/utils/clock"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
Expand All @@ -43,6 +42,7 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/controller"
operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging"
)

type Controller struct {
Expand Down Expand Up @@ -110,7 +110,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
// with making any scheduling decision off of our state nodes. Otherwise, we have the potential to make
// a scheduling decision based on a smaller subset of nodes in our cluster state than actually exist.
if !c.cluster.Synced(ctx) {
logging.FromContext(ctx).Debugf("waiting on cluster sync")
log.FromContext(ctx).V(1).Info("waiting on cluster sync")
return reconcile.Result{RequeueAfter: time.Second}, nil
}

Expand Down Expand Up @@ -185,7 +185,7 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro
// 3. Add Command to orchestration.Queue to wait to delete the candiates.
func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, schedulingResults scheduling.Results) error {
commandID := uuid.NewUUID()
logging.FromContext(ctx).With("command-id", commandID).Infof("disrupting via %s %s", m.Type(), cmd)
log.FromContext(ctx).WithValues("command-id", commandID).Info(fmt.Sprintf("disrupting via %s %s", m.Type(), cmd))

stateNodes := lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode {
return c.StateNode
Expand Down Expand Up @@ -214,7 +214,7 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
// tainted with the Karpenter taint, the provisioning controller will continue
// to do scheduling simulations and nominate the pods on the candidate nodes until
// the node is cleaned up.
schedulingResults.Record(logging.WithLogger(ctx, operatorlogging.NopLogger), c.recorder, c.cluster)
schedulingResults.Record(log.IntoContext(ctx, operatorlogging.NopLogger), c.recorder, c.cluster)

providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
// We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion
Expand Down Expand Up @@ -275,7 +275,7 @@ func (c *Controller) logAbnormalRuns(ctx context.Context) {
defer c.mu.Unlock()
for name, runTime := range c.lastRun {
if timeSince := c.clock.Since(runTime); timeSince > AbnormalTimeLimit {
logging.FromContext(ctx).Debugf("abnormal time between runs of %s = %s", name, timeSince)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abnormal time between runs of %s = %s", name, timeSince))
}
}
}
Expand All @@ -284,7 +284,7 @@ func (c *Controller) logAbnormalRuns(ctx context.Context) {
func (c *Controller) logInvalidBudgets(ctx context.Context) {
nodePoolList := &v1beta1.NodePoolList{}
if err := c.kubeClient.List(ctx, nodePoolList); err != nil {
logging.FromContext(ctx).Errorf("listing nodepools, %s", err)
log.FromContext(ctx).Error(err, "failed listing nodepools")
return
}
var buf bytes.Buffer
Expand All @@ -295,6 +295,6 @@ func (c *Controller) logInvalidBudgets(ctx context.Context) {
}
}
if buf.Len() > 0 {
logging.FromContext(ctx).Errorf("detected disruption budget errors: %s", buf.String())
log.FromContext(ctx).Error(errors.New(buf.String()), "detected disruption budget errors")
}
}
7 changes: 4 additions & 3 deletions pkg/controllers/disruption/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package disruption
import (
"context"
"errors"
"fmt"

"github.com/samber/lo"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/metrics"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...)
if err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd))
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, err
Expand All @@ -99,7 +100,7 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
if lo.ContainsBy(validatedCandidates, func(c *Candidate) bool {
return len(c.reschedulablePods) != 0
}) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd))
return Command{}, scheduling.Results{}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (

"k8s.io/utils/clock"

"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events"
Expand Down Expand Up @@ -112,7 +112,7 @@ func (e *Expiration) ComputeCommand(ctx context.Context, disruptionBudgetMapping
e.recorder.Publish(disruptionevents.Blocked(candidate.Node, candidate.NodeClaim, results.NonPendingPodSchedulingErrors())...)
continue
}
logging.FromContext(ctx).With("ttl", candidates[0].nodePool.Spec.Disruption.ExpireAfter.String()).Infof("triggering termination for expired node after TTL")
log.FromContext(ctx).WithValues("ttl", candidates[0].nodePool.Spec.Disruption.ExpireAfter.String()).Info("triggering termination for expired node after TTL")
return Command{
candidates: []*Candidate{candidate},
replacements: results.NewNodeClaims,
Expand Down
11 changes: 5 additions & 6 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/clock"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
Expand Down Expand Up @@ -82,7 +82,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
pods = append(pods, n.reschedulablePods...)
}
pods = append(pods, deletingNodePods...)
scheduler, err := provisioner.NewScheduler(logging.WithLogger(ctx, operatorlogging.NopLogger), pods, stateNodes)
scheduler, err := provisioner.NewScheduler(log.IntoContext(ctx, operatorlogging.NopLogger), pods, stateNodes)
if err != nil {
return pscheduling.Results{}, fmt.Errorf("creating scheduler, %w", err)
}
Expand All @@ -91,7 +91,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
return client.ObjectKeyFromObject(p), nil
})

results := scheduler.Solve(logging.WithLogger(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(pscheduling.MaxInstanceTypes)
results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(pscheduling.MaxInstanceTypes)
for _, n := range results.ExistingNodes {
// We consider existing nodes for scheduling. When these nodes are unmanaged, their taint logic should
// tell us if we can schedule to them or not; however, if these nodes are managed, we will still schedule to them
Expand Down Expand Up @@ -150,8 +150,7 @@ func GetPodEvictionCost(ctx context.Context, p *v1.Pod) float64 {
if ok {
podDeletionCost, err := strconv.ParseFloat(podDeletionCostStr, 64)
if err != nil {
logging.FromContext(ctx).Errorf("parsing %s=%s from pod %s, %s",
v1.PodDeletionCost, podDeletionCostStr, client.ObjectKeyFromObject(p), err)
log.FromContext(ctx).Error(err, fmt.Sprintf("failed parsing %s=%s from pod %s", v1.PodDeletionCost, podDeletionCostStr, client.ObjectKeyFromObject(p)))
} else {
// the pod deletion disruptionCost is in [-2147483647, 2147483647]
// the min pod disruptionCost makes one pod ~ -15 pods, and the max pod disruptionCost to ~ 17 pods.
Expand Down Expand Up @@ -288,7 +287,7 @@ func BuildNodePoolMap(ctx context.Context, kubeClient client.Client, cloudProvid
if err != nil {
// don't error out on building the node pool, we just won't be able to handle any nodes that
// were created by it
logging.FromContext(ctx).Errorf("listing instance types for %s, %s", np.Name, err)
log.FromContext(ctx).Error(err, fmt.Sprintf("failed listing instance types for %s", np.Name))
continue
}
if len(nodePoolInstanceTypes) == 0 {
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (

"github.com/samber/lo"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/logging"

"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB

if err := NewValidation(m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue).IsValid(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd))
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err)
Expand Down Expand Up @@ -118,9 +119,9 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
if m.clock.Now().After(timeout) {
ConsolidationTimeoutTotalCounter.WithLabelValues(m.ConsolidationType()).Inc()
if lastSavedCommand.candidates == nil {
logging.FromContext(ctx).Debugf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2))
} else {
logging.FromContext(ctx).Debugf("stopping multi-node consolidation after timeout, returning last valid command %s", lastSavedCommand)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("stopping multi-node consolidation after timeout, returning last valid command %s", lastSavedCommand))
}
return lastSavedCommand, lastSavedResults, nil
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/controllers/disruption/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
"sync"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/multierr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -177,7 +178,8 @@ func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R
panic("unexpected failure, disruption queue has shut down")
}
cmd := item.(*Command)
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("command-id", string(cmd.id)))
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("command-id", string(cmd.id)))

if err := q.waitOrTerminate(ctx, cmd); err != nil {
// If recoverable, re-queue and try again.
if !IsUnrecoverableError(err) {
Expand All @@ -201,13 +203,13 @@ func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R
}).Add(float64(len(failedLaunches)))
multiErr := multierr.Combine(err, cmd.lastError, state.RequireNoScheduleTaint(ctx, q.kubeClient, false, cmd.candidates...))
// Log the error
logging.FromContext(ctx).With("nodes", strings.Join(lo.Map(cmd.candidates, func(s *state.StateNode, _ int) string {
log.FromContext(ctx).WithValues("nodes", strings.Join(lo.Map(cmd.candidates, func(s *state.StateNode, _ int) string {
return s.Name()
}), ",")).Errorf("failed to disrupt nodes, %s", multiErr)
}), ",")).Error(multiErr, "failed terminating nodes while executing a disruption command")
}
// If command is complete, remove command from queue.
q.Remove(cmd)
logging.FromContext(ctx).Infof("command succeeded")
log.FromContext(ctx).Info("command succeeded")
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/disruption/orchestration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
. "knative.dev/pkg/logging/testing"
"sigs.k8s.io/controller-runtime/pkg/client"

. "sigs.k8s.io/karpenter/pkg/utils/testing"

"sigs.k8s.io/karpenter/pkg/apis"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"fmt"
"time"

"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/metrics"
Expand Down Expand Up @@ -62,21 +62,21 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
}
if s.clock.Now().After(timeout) {
ConsolidationTimeoutTotalCounter.WithLabelValues(s.ConsolidationType()).Inc()
logging.FromContext(ctx).Debugf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i))
return Command{}, scheduling.Results{}, nil
}
// compute a possible consolidation option
cmd, results, err := s.computeConsolidation(ctx, candidate)
if err != nil {
logging.FromContext(ctx).Errorf("computing consolidation %s", err)
log.FromContext(ctx).Error(err, "failed computing consolidation")
continue
}
if cmd.Action() == NoOpAction {
continue
}
if err := v.IsValid(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd))
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
clock "k8s.io/utils/clock/testing"
. "knative.dev/pkg/logging/testing"
"knative.dev/pkg/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

. "sigs.k8s.io/karpenter/pkg/utils/testing"

coreapis "sigs.k8s.io/karpenter/pkg/apis"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/utils/clock"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/utils/pod"

Expand Down Expand Up @@ -116,7 +116,7 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events
}
pods, err := node.Pods(ctx, kubeClient)
if err != nil {
logging.FromContext(ctx).Errorf("determining node pods, %s", err)
log.FromContext(ctx).Error(err, "failed resolving node pods")
return nil, fmt.Errorf("getting pods from state node, %w", err)
}
for _, po := range pods {
Expand Down
Loading

0 comments on commit 9b145a6

Please sign in to comment.