Skip to content

Commit

Permalink
1319 fix throttling (kyverno#1341)
Browse files Browse the repository at this point in the history
* fix policy status and generate controller issues

* shorten ACTION column name

* update logs

Co-authored-by: Shuting Zhao <[email protected]>
  • Loading branch information
JimBugwadia and realshuting authored Nov 30, 2020
1 parent 36615e2 commit 2344b2c
Show file tree
Hide file tree
Showing 20 changed files with 162 additions and 122 deletions.
2 changes: 1 addition & 1 deletion charts/kyverno/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ spec:
name: Background
type: string
- jsonPath: .spec.validationFailureAction
name: Validation Failure Action
name: Action
type: string
name: v1
schema:
Expand Down
3 changes: 1 addition & 2 deletions cmd/kyverno/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func main() {

// DYNAMIC CLIENT
// - client for all registered resources
client, err := dclient.NewClient(clientConfig, 5*time.Minute, stopCh, log.Log)
client, err := dclient.NewClient(clientConfig, 15*time.Minute, stopCh, log.Log)
if err != nil {
setupLog.Error(err, "Failed to create client")
os.Exit(1)
Expand Down Expand Up @@ -146,7 +146,6 @@ func main() {
// Resource Mutating Webhook Watcher
webhookMonitor := webhookconfig.NewMonitor(log.Log.WithName("WebhookMonitor"))


// KYVERNO CRD INFORMER
// watches CRD resources:
// - ClusterPolicy, Policy
Expand Down
2 changes: 1 addition & 1 deletion definitions/crds/kyverno.io_clusterpolicies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ spec:
name: Background
type: string
- jsonPath: .spec.validationFailureAction
name: Validation Failure Action
name: Action
type: string
name: v1
schema:
Expand Down
2 changes: 1 addition & 1 deletion definitions/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ spec:
name: Background
type: string
- jsonPath: .spec.validationFailureAction
name: Validation Failure Action
name: Action
type: string
name: v1
schema:
Expand Down
2 changes: 1 addition & 1 deletion definitions/install_debug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ spec:
name: Background
type: string
- jsonPath: .spec.validationFailureAction
name: Validation Failure Action
name: Action
type: string
name: v1
schema:
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/kyverno/v1/clusterpolicy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// +kubebuilder:subresource:status
// +kubebuilder:resource:path=clusterpolicies,scope="Cluster",shortName=cpol
// +kubebuilder:printcolumn:name="Background",type="string",JSONPath=".spec.background"
// +kubebuilder:printcolumn:name="Validation Failure Action",type="string",JSONPath=".spec.validationFailureAction"
// +kubebuilder:printcolumn:name="Action",type="string",JSONPath=".spec.validationFailureAction"
type ClusterPolicy struct {
metav1.TypeMeta `json:",inline,omitempty" yaml:",inline,omitempty"`
metav1.ObjectMeta `json:"metadata,omitempty" yaml:"metadata,omitempty"`
Expand Down
27 changes: 16 additions & 11 deletions pkg/generate/cleanup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,13 @@ func NewController(
log logr.Logger,
) *Controller {
c := Controller{
kyvernoClient: kyvernoclient,
client: client,
//TODO: do the math for worst case back off and make sure cleanup runs after that
// as we dont want a deleted GR to be re-queue
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(1, 30), "generate-request-cleanup"),
kyvernoClient: kyvernoclient,
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "generate-request-cleanup"),
dynamicInformer: dynamicInformer,
log: log,
}

c.control = Control{client: kyvernoclient}
c.enqueueGR = c.enqueue
c.syncHandler = c.syncGenerateRequest
Expand All @@ -85,22 +84,23 @@ func NewController(
c.pSynced = pInformer.Informer().HasSynced
c.grSynced = grInformer.Informer().HasSynced

pInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.deletePolicy, // we only cleanup if the policy is delete
}, 2*time.Minute)
})

grInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
grInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addGR,
UpdateFunc: c.updateGR,
DeleteFunc: c.deleteGR,
}, 2*time.Minute)
})

//TODO: dynamic registration
// Only supported for namespaces
nsInformer := dynamicInformer.ForResource(client.DiscoveryClient.GetGVRFromKind("Namespace"))
c.nsInformer = nsInformer
c.nsInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
c.nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.deleteGenericResource,
}, 2*time.Minute)
})

return &c
}
Expand Down Expand Up @@ -134,6 +134,7 @@ func (c *Controller) deletePolicy(obj interface{}) {
return
}
}

logger.V(4).Info("deleting policy", "name", p.Name)
// clean up the GR
// Get the corresponding GR
Expand All @@ -143,6 +144,7 @@ func (c *Controller) deletePolicy(obj interface{}) {
logger.Error(err, "failed to generate request CR for the policy", "name", p.Name)
return
}

for _, gr := range grs {
c.addGR(gr)
}
Expand All @@ -167,12 +169,14 @@ func (c *Controller) deleteGR(obj interface{}) {
logger.Info("Couldn't get object from tombstone", "obj", obj)
return
}

_, ok = tombstone.Obj.(*kyverno.GenerateRequest)
if !ok {
logger.Info("ombstone contained object that is not a Generate Request", "obj", obj)
return
}
}

for _, resource := range gr.Status.GeneratedResources {
r, err := c.client.GetResource(resource.APIVersion, resource.Kind, resource.Namespace, resource.Name)
if err != nil && !apierrors.IsNotFound(err) {
Expand All @@ -187,6 +191,7 @@ func (c *Controller) deleteGR(obj interface{}) {
}
}
}

logger.V(4).Info("deleting Generate Request CR", "name", gr.Name)
// sync Handler will remove it from the queue
c.enqueueGR(gr)
Expand Down
5 changes: 3 additions & 2 deletions pkg/generate/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,9 @@ func (c *Controller) applyGeneratePolicy(log logr.Logger, policyContext engine.P
genResources = append(genResources, genResource)
}

if gr.Status.State == "" {
c.policyStatusListener.Send(generateSyncStats{
if gr.Status.State == "" && len(genResources) > 0 {
log.V(3).Info("updating policy status", "policy", policy.Name, "data", ruleNameToProcessingTime)
c.policyStatusListener.Update(generateSyncStats{
policyName: policy.Name,
ruleNameToProcessingTime: ruleNameToProcessingTime,
})
Expand Down
34 changes: 18 additions & 16 deletions pkg/generate/controller.go → pkg/generate/generate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
)

const (
maxRetries = 5
maxRetries = 5
resyncPeriod = 15 * time.Minute
)

// Controller manages the life-cycle for Generate-Requests and applies generate rule
type Controller struct {
// dyanmic client implementation
// dynamic client implementation
client *dclient.Client
// typed client for kyverno CRDs
// typed client for Kyverno CRDs
kyvernoClient *kyvernoclient.Clientset
// event generator interface
eventGen event.Interface
Expand All @@ -54,7 +55,7 @@ type Controller struct {
pSynced cache.InformerSynced
// grSynced returns true if the Generate Request store has been synced at least once
grSynced cache.InformerSynced
// dyanmic sharedinformer factory
// dynamic shared informer factory
dynamicInformer dynamicinformer.DynamicSharedInformerFactory
//TODO: list of generic informers
// only support Namespaces for re-evalutation on resource updates
Expand All @@ -80,12 +81,10 @@ func NewController(
resCache resourcecache.ResourceCacheIface,
) *Controller {
c := Controller{
client: client,
kyvernoClient: kyvernoclient,
eventGen: eventGen,
//TODO: do the math for worst case back off and make sure cleanup runs after that
// as we dont want a deleted GR to be re-queue
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(1, 30), "generate-request"),
client: client,
kyvernoClient: kyvernoclient,
eventGen: eventGen,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "generate-request"),
dynamicInformer: dynamicInformer,
log: log,
policyStatusListener: policyStatus,
Expand All @@ -94,16 +93,16 @@ func NewController(
}
c.statusControl = StatusControl{client: kyvernoclient}

pInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: c.updatePolicy, // We only handle updates to policy
// Deletion of policy will be handled by cleanup controller
}, 2*time.Minute)
})

grInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
grInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addGR,
UpdateFunc: c.updateGR,
DeleteFunc: c.deleteGR,
}, 2*time.Minute)
})

c.enqueueGR = c.enqueue
c.syncHandler = c.syncGenerateRequest
Expand All @@ -118,9 +117,10 @@ func NewController(
// Only supported for namespaces
nsInformer := dynamicInformer.ForResource(client.DiscoveryClient.GetGVRFromKind("Namespace"))
c.nsInformer = nsInformer
c.nsInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
c.nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: c.updateGenericResource,
}, 2*time.Minute)
})

return &c
}

Expand Down Expand Up @@ -306,6 +306,7 @@ func (c *Controller) syncGenerateRequest(key string) error {
defer func() {
logger.V(4).Info("finished sync", "key", key, "processingTime", time.Since(startTime).String())
}()

_, grName, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
Expand All @@ -320,5 +321,6 @@ func (c *Controller) syncGenerateRequest(key string) error {
logger.Error(err, "failed to list generate requests")
return err
}

return c.processGR(gr)
}
2 changes: 1 addition & 1 deletion pkg/kyverno/apply/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ To apply policy with variables:
if err != nil {
return err
}
dClient, err = client.NewClient(restConfig, 5*time.Minute, make(chan struct{}), log.Log)
dClient, err = client.NewClient(restConfig, 15*time.Minute, make(chan struct{}), log.Log)
if err != nil {
return err
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/policy/controller.go → pkg/policy/validate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: eventInterface})

pc := PolicyController{
client: client,
kyvernoClient: kyvernoClient,
eventGen: eventGen,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "policy_controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "policy"),
configHandler: configHandler,
prGenerator: prGenerator,
log: log,
resCache: resCache,
client: client,
kyvernoClient: kyvernoClient,
eventGen: eventGen,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "policy_controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "policy"),
configHandler: configHandler,
prGenerator: prGenerator,
log: log,
resCache: resCache,
}

pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
2 changes: 1 addition & 1 deletion pkg/policyreport/reportrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (gen *Generator) syncHandler(info Info) error {
func (gen *Generator) sync(reportReq *unstructured.Unstructured, info Info) error {
defer func() {
if val := reportReq.GetAnnotations()["fromSync"]; val == "true" {
gen.policyStatusListener.Send(violationCount{
gen.policyStatusListener.Update(violationCount{
policyName: info.PolicyName,
violatedRules: info.Rules,
})
Expand Down
Loading

0 comments on commit 2344b2c

Please sign in to comment.