Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: migrate knative logger to controller-runtime injected logger #1050

Merged
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/awslabs/operatorpkg v0.0.0-20240514175841-edb8fe5824b4
github.com/docker/docker v26.1.2+incompatible
github.com/go-logr/logr v1.4.1
github.com/go-logr/zapr v1.3.0
github.com/imdario/mergo v0.3.16
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/onsi/ginkgo/v2 v2.17.3
Expand All @@ -35,8 +34,6 @@ require (
sigs.k8s.io/controller-runtime v0.18.2
)

require github.com/go-task/slim-sprig/v3 v3.0.0 // indirect

require (
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
Expand All @@ -52,9 +49,11 @@ require (
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/zapr v1.3.0
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/gobuffalo/flect v0.2.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand Down
4 changes: 2 additions & 2 deletions kwok/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package main

import (
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/log"

kwok "sigs.k8s.io/karpenter/kwok/cloudprovider"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
Expand All @@ -41,7 +41,7 @@ func main() {
ctx, op := operator.NewOperator()
instanceTypes, err := kwok.ConstructInstanceTypes()
if err != nil {
logging.FromContext(ctx).Fatalf("failed to construct instance types: %s", err)
log.FromContext(ctx).Error(err, "failed constructing instance types")
}

cloudProvider := kwok.NewCloudProvider(ctx, op.GetClient(), instanceTypes)
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/v1beta1/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
. "knative.dev/pkg/logging/testing"

. "sigs.k8s.io/karpenter/pkg/utils/testing"

"sigs.k8s.io/karpenter/pkg/apis"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
Expand Down
18 changes: 9 additions & 9 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package disruption
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -27,13 +28,11 @@ import (
"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/utils/clock"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
Expand All @@ -43,6 +42,7 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/controller"
operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging"
)

type Controller struct {
Expand Down Expand Up @@ -110,7 +110,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
// with making any scheduling decision off of our state nodes. Otherwise, we have the potential to make
// a scheduling decision based on a smaller subset of nodes in our cluster state than actually exist.
if !c.cluster.Synced(ctx) {
logging.FromContext(ctx).Debugf("waiting on cluster sync")
log.FromContext(ctx).V(1).Info("waiting on cluster sync")
return reconcile.Result{RequeueAfter: time.Second}, nil
}

Expand Down Expand Up @@ -185,7 +185,7 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro
// 3. Add Command to orchestration.Queue to wait to delete the candiates.
func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, schedulingResults scheduling.Results) error {
commandID := uuid.NewUUID()
logging.FromContext(ctx).With("command-id", commandID).Infof("disrupting via %s %s", m.Type(), cmd)
log.FromContext(ctx).WithValues("command-id", commandID).Info(fmt.Sprintf("disrupting via %s %s", m.Type(), cmd))
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved

stateNodes := lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode {
return c.StateNode
Expand Down Expand Up @@ -214,7 +214,7 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
// tainted with the Karpenter taint, the provisioning controller will continue
// to do scheduling simulations and nominate the pods on the candidate nodes until
// the node is cleaned up.
schedulingResults.Record(logging.WithLogger(ctx, operatorlogging.NopLogger), c.recorder, c.cluster)
schedulingResults.Record(log.IntoContext(ctx, operatorlogging.NopLogger), c.recorder, c.cluster)

providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
// We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion
Expand Down Expand Up @@ -275,7 +275,7 @@ func (c *Controller) logAbnormalRuns(ctx context.Context) {
defer c.mu.Unlock()
for name, runTime := range c.lastRun {
if timeSince := c.clock.Since(runTime); timeSince > AbnormalTimeLimit {
logging.FromContext(ctx).Debugf("abnormal time between runs of %s = %s", name, timeSince)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abnormal time between runs of %s = %s", name, timeSince))
}
}
}
Expand All @@ -284,7 +284,7 @@ func (c *Controller) logAbnormalRuns(ctx context.Context) {
func (c *Controller) logInvalidBudgets(ctx context.Context) {
nodePoolList := &v1beta1.NodePoolList{}
if err := c.kubeClient.List(ctx, nodePoolList); err != nil {
logging.FromContext(ctx).Errorf("listing nodepools, %s", err)
log.FromContext(ctx).Error(err, "failed listing nodepools")
return
}
var buf bytes.Buffer
Expand All @@ -295,6 +295,6 @@ func (c *Controller) logInvalidBudgets(ctx context.Context) {
}
}
if buf.Len() > 0 {
logging.FromContext(ctx).Errorf("detected disruption budget errors: %s", buf.String())
log.FromContext(ctx).Error(errors.New(buf.String()), "detected disruption budget errors")
}
}
7 changes: 4 additions & 3 deletions pkg/controllers/disruption/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package disruption
import (
"context"
"errors"
"fmt"

"github.com/samber/lo"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/metrics"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...)
if err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd))
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, err
Expand All @@ -99,7 +100,7 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
if lo.ContainsBy(validatedCandidates, func(c *Candidate) bool {
return len(c.reschedulablePods) != 0
}) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd))
return Command{}, scheduling.Results{}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (

"k8s.io/utils/clock"

"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events"
Expand Down Expand Up @@ -112,7 +112,7 @@ func (e *Expiration) ComputeCommand(ctx context.Context, disruptionBudgetMapping
e.recorder.Publish(disruptionevents.Blocked(candidate.Node, candidate.NodeClaim, results.NonPendingPodSchedulingErrors())...)
continue
}
logging.FromContext(ctx).With("ttl", candidates[0].nodePool.Spec.Disruption.ExpireAfter.String()).Infof("triggering termination for expired node after TTL")
log.FromContext(ctx).WithValues("ttl", candidates[0].nodePool.Spec.Disruption.ExpireAfter.String()).Info("triggering termination for expired node after TTL")
return Command{
candidates: []*Candidate{candidate},
replacements: results.NewNodeClaims,
Expand Down
11 changes: 5 additions & 6 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/clock"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
Expand Down Expand Up @@ -82,7 +82,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
pods = append(pods, n.reschedulablePods...)
}
pods = append(pods, deletingNodePods...)
scheduler, err := provisioner.NewScheduler(logging.WithLogger(ctx, operatorlogging.NopLogger), pods, stateNodes)
scheduler, err := provisioner.NewScheduler(log.IntoContext(ctx, operatorlogging.NopLogger), pods, stateNodes)
if err != nil {
return pscheduling.Results{}, fmt.Errorf("creating scheduler, %w", err)
}
Expand All @@ -91,7 +91,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
return client.ObjectKeyFromObject(p), nil
})

results := scheduler.Solve(logging.WithLogger(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(pscheduling.MaxInstanceTypes)
results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(pscheduling.MaxInstanceTypes)
for _, n := range results.ExistingNodes {
// We consider existing nodes for scheduling. When these nodes are unmanaged, their taint logic should
// tell us if we can schedule to them or not; however, if these nodes are managed, we will still schedule to them
Expand Down Expand Up @@ -150,8 +150,7 @@ func GetPodEvictionCost(ctx context.Context, p *v1.Pod) float64 {
if ok {
podDeletionCost, err := strconv.ParseFloat(podDeletionCostStr, 64)
if err != nil {
logging.FromContext(ctx).Errorf("parsing %s=%s from pod %s, %s",
v1.PodDeletionCost, podDeletionCostStr, client.ObjectKeyFromObject(p), err)
log.FromContext(ctx).Error(err, fmt.Sprintf("failed parsing %s=%s from pod %s", v1.PodDeletionCost, podDeletionCostStr, client.ObjectKeyFromObject(p)))
} else {
// the pod deletion disruptionCost is in [-2147483647, 2147483647]
// the min pod disruptionCost makes one pod ~ -15 pods, and the max pod disruptionCost to ~ 17 pods.
Expand Down Expand Up @@ -288,7 +287,7 @@ func BuildNodePoolMap(ctx context.Context, kubeClient client.Client, cloudProvid
if err != nil {
// don't error out on building the node pool, we just won't be able to handle any nodes that
// were created by it
logging.FromContext(ctx).Errorf("listing instance types for %s, %s", np.Name, err)
log.FromContext(ctx).Error(err, fmt.Sprintf("failed listing instance types for %s", np.Name))
continue
}
if len(nodePoolInstanceTypes) == 0 {
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (

"github.com/samber/lo"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/logging"

"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB

if err := NewValidation(m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue).IsValid(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd))
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err)
Expand Down Expand Up @@ -118,9 +119,9 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
if m.clock.Now().After(timeout) {
ConsolidationTimeoutTotalCounter.WithLabelValues(m.ConsolidationType()).Inc()
if lastSavedCommand.candidates == nil {
logging.FromContext(ctx).Debugf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2))
} else {
logging.FromContext(ctx).Debugf("stopping multi-node consolidation after timeout, returning last valid command %s", lastSavedCommand)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("stopping multi-node consolidation after timeout, returning last valid command %s", lastSavedCommand))
}
return lastSavedCommand, lastSavedResults, nil
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/controllers/disruption/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
"sync"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/multierr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -177,7 +178,8 @@ func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R
panic("unexpected failure, disruption queue has shut down")
}
cmd := item.(*Command)
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("command-id", string(cmd.id)))
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("command-id", string(cmd.id)))

if err := q.waitOrTerminate(ctx, cmd); err != nil {
// If recoverable, re-queue and try again.
if !IsUnrecoverableError(err) {
Expand All @@ -201,13 +203,13 @@ func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R
}).Add(float64(len(failedLaunches)))
multiErr := multierr.Combine(err, cmd.lastError, state.RequireNoScheduleTaint(ctx, q.kubeClient, false, cmd.candidates...))
// Log the error
logging.FromContext(ctx).With("nodes", strings.Join(lo.Map(cmd.candidates, func(s *state.StateNode, _ int) string {
log.FromContext(ctx).WithValues("nodes", strings.Join(lo.Map(cmd.candidates, func(s *state.StateNode, _ int) string {
return s.Name()
}), ",")).Errorf("failed to disrupt nodes, %s", multiErr)
}), ",")).Error(multiErr, "failed terminating nodes while executing a disruption command")
}
// If command is complete, remove command from queue.
q.Remove(cmd)
logging.FromContext(ctx).Infof("command succeeded")
log.FromContext(ctx).Info("command succeeded")
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/disruption/orchestration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
. "knative.dev/pkg/logging/testing"
"sigs.k8s.io/controller-runtime/pkg/client"

. "sigs.k8s.io/karpenter/pkg/utils/testing"

"sigs.k8s.io/karpenter/pkg/apis"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"fmt"
"time"

"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/metrics"
Expand Down Expand Up @@ -62,21 +62,21 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
}
if s.clock.Now().After(timeout) {
ConsolidationTimeoutTotalCounter.WithLabelValues(s.ConsolidationType()).Inc()
logging.FromContext(ctx).Debugf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i))
return Command{}, scheduling.Results{}, nil
}
// compute a possible consolidation option
cmd, results, err := s.computeConsolidation(ctx, candidate)
if err != nil {
logging.FromContext(ctx).Errorf("computing consolidation %s", err)
log.FromContext(ctx).Error(err, "failed computing consolidation")
continue
}
if cmd.Action() == NoOpAction {
continue
}
if err := v.IsValid(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd))
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
clock "k8s.io/utils/clock/testing"
. "knative.dev/pkg/logging/testing"
"knative.dev/pkg/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

. "sigs.k8s.io/karpenter/pkg/utils/testing"

coreapis "sigs.k8s.io/karpenter/pkg/apis"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/utils/clock"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/utils/pod"

Expand Down Expand Up @@ -116,7 +116,7 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events
}
pods, err := node.Pods(ctx, kubeClient)
if err != nil {
logging.FromContext(ctx).Errorf("determining node pods, %s", err)
log.FromContext(ctx).Error(err, "failed resolving node pods")
return nil, fmt.Errorf("getting pods from state node, %w", err)
}
for _, po := range pods {
Expand Down
Loading
Loading