From 683a6c0c75e69c416f46df29db1389feac20b0cd Mon Sep 17 00:00:00 2001 From: Ville Aikas <11279988+vaikas@users.noreply.github.com> Date: Sat, 23 Apr 2022 21:30:55 -0700 Subject: [PATCH] Add parallelization for processing policies / authorities. (#1795) * Add parallelization for processing policies / authorities. Signed-off-by: Ville Aikas * Simplify by removing the wg. If error encountered, create an internal error. Signed-off-by: Ville Aikas --- pkg/cosign/kubernetes/webhook/validator.go | 151 ++++++++++++++------- 1 file changed, 103 insertions(+), 48 deletions(-) diff --git a/pkg/cosign/kubernetes/webhook/validator.go b/pkg/cosign/kubernetes/webhook/validator.go index 7590892a589..e9578ba4aa5 100644 --- a/pkg/cosign/kubernetes/webhook/validator.go +++ b/pkg/cosign/kubernetes/webhook/validator.go @@ -236,11 +236,13 @@ func (v *Validator) validatePodSpec(ctx context.Context, ps *corev1.PodSpec, opt // reasonable that the return value is 0, nil since there were no errors, but // the image was not validated against any matching policy and hence authority. func validatePolicies(ctx context.Context, ref name.Reference, policies map[string]webhookcip.ClusterImagePolicy, remoteOpts ...ociremote.Option) (map[string]*PolicyResult, map[string][]error) { - // Gather all validated policies here. - policyResults := make(map[string]*PolicyResult) - // For a policy that does not pass at least one authority, gather errors - // here so that we can give meaningful errors to the user. - ret := map[string][]error{} + type retChannelType struct { + name string + policyResult *PolicyResult + errors []error + } + results := make(chan retChannelType, len(policies)) + // For each matching policy it must validate at least one Authority within // it. // From the Design document, the part about multiple Policies matching: @@ -249,31 +251,53 @@ func validatePolicies(ctx context.Context, ref name.Reference, policies map[stri // If none of the Authorities for a given policy pass the checks, gather // the errors here. If one passes, do not return the errors. for cipName, cip := range policies { + // Due to running in gofunc + cipName := cipName + cip := cip logging.FromContext(ctx).Debugf("Checking Policy: %s", cipName) - policyResult, errs := ValidatePolicy(ctx, ref, cip, remoteOpts...) - if len(errs) > 0 { - ret[cipName] = append(ret[cipName], errs...) - } else { - // Ok, at least one Authority on the policy passed. If there's a CIP level - // policy, apply it against the results of the successful Authorities - // outputs. - if cip.Policy != nil { - logging.FromContext(ctx).Infof("Validating CIP level policy for %s", cipName) - policyJSON, err := json.Marshal(policyResult) - if err != nil { - ret[cipName] = append(ret[cipName], errors.Wrap(err, "marshaling policyresult")) - } else { - logging.FromContext(ctx).Infof("Validating CIP level policy against %s", string(policyJSON)) - err = policy.EvaluatePolicyAgainstJSON(ctx, "ClusterImagePolicy", cip.Policy.Type, cip.Policy.Data, policyJSON) + go func() { + result := retChannelType{name: cipName} + + result.policyResult, result.errors = ValidatePolicy(ctx, ref, cip, remoteOpts...) + if len(result.errors) == 0 { + // Ok, at least one Authority on the policy passed. If there's a CIP level + // policy, apply it against the results of the successful Authorities + // outputs. + if cip.Policy != nil { + logging.FromContext(ctx).Infof("Validating CIP level policy for %s", cipName) + policyJSON, err := json.Marshal(result.policyResult) if err != nil { - ret[cipName] = append(ret[cipName], err) + results <- result } else { - policyResults[cipName] = policyResult + logging.FromContext(ctx).Infof("Validating CIP level policy against %s", string(policyJSON)) + err = policy.EvaluatePolicyAgainstJSON(ctx, "ClusterImagePolicy", cip.Policy.Type, cip.Policy.Data, policyJSON) + if err != nil { + result.errors = append(result.errors, err) + } } } - } else { - policyResults[cipName] = policyResult } + results <- result + }() + } + // Gather all validated policies here. + policyResults := make(map[string]*PolicyResult) + // For a policy that does not pass at least one authority, gather errors + // here so that we can give meaningful errors to the user. + ret := map[string][]error{} + + for i := 0; i < len(policies); i++ { + result, ok := <-results + if !ok { + ret["internalerror"] = append(ret["internalerror"], fmt.Errorf("results channel failed to produce a result")) + } + switch { + case len(result.errors) > 0: + ret[result.name] = append(ret[result.name], result.errors...) + case len(result.policyResult.AuthorityMatches) > 0: + policyResults[result.name] = result.policyResult + default: + ret[result.name] = append(ret[result.name], fmt.Errorf("failed to process policy: %s", result.name)) } } return policyResults, ret @@ -285,40 +309,71 @@ func validatePolicies(ctx context.Context, ref name.Reference, policies map[stri // Returns PolicyResult, or errors encountered if none of the authorities // passed. func ValidatePolicy(ctx context.Context, ref name.Reference, cip webhookcip.ClusterImagePolicy, remoteOpts ...ociremote.Option) (*PolicyResult, []error) { - // If none of the Authorities for a given policy pass the checks, gather - // the errors here. If one passes, do not return the errors. - authorityErrors := []error{} - // We collect all the successfully satisfied Authorities into this and - // return it. - policyResult := PolicyResult{AuthorityMatches: make(map[string]AuthorityMatch)} + // Each gofunc creates and puts one of these into a results channel. + // Once each gofunc finishes, we go through the channel and pull out + // the results. + type retChannelType struct { + name string + attestations map[string][]PolicySignature + signatures []PolicySignature + err error + } + results := make(chan retChannelType, len(cip.Authorities)) for _, authority := range cip.Authorities { + authority := authority // due to gofunc logging.FromContext(ctx).Debugf("Checking Authority: %s", authority.Name) - // Assignment for appendAssign lint error - authorityRemoteOpts := remoteOpts - authorityRemoteOpts = append(authorityRemoteOpts, authority.RemoteOpts...) - if len(authority.Attestations) > 0 { - // We're doing the verify-attestations path, so validate (.att) - validatedAttestations, err := ValidatePolicyAttestationsForAuthority(ctx, ref, authority, authorityRemoteOpts...) - if err != nil { - authorityErrors = append(authorityErrors, err) - } else { - policyResult.AuthorityMatches[authority.Name] = AuthorityMatch{Attestations: validatedAttestations} - } - } else { - // We're doing the verify path, so validate image signatures (.sig) - validatedSignatures, err := ValidatePolicySignaturesForAuthority(ctx, ref, authority, authorityRemoteOpts...) - if err != nil { - authorityErrors = append(authorityErrors, err) + go func() { + result := retChannelType{name: authority.Name} + // Assignment for appendAssign lint error + authorityRemoteOpts := remoteOpts + authorityRemoteOpts = append(authorityRemoteOpts, authority.RemoteOpts...) + + if len(authority.Attestations) > 0 { + // We're doing the verify-attestations path, so validate (.att) + validatedAttestations, err := ValidatePolicyAttestationsForAuthority(ctx, ref, authority, authorityRemoteOpts...) + if err != nil { + result.err = err + } else { + result.attestations = validatedAttestations + } } else { - policyResult.AuthorityMatches[authority.Name] = AuthorityMatch{Signatures: validatedSignatures} + validatedSignatures, err := ValidatePolicySignaturesForAuthority(ctx, ref, authority, authorityRemoteOpts...) + if err != nil { + result.err = err + } else { + result.signatures = validatedSignatures + } } + results <- result + }() + } + // If none of the Authorities for a given policy pass the checks, gather + // the errors here. If one passes, do not return the errors. + authorityErrors := []error{} + // We collect all the successfully satisfied Authorities into this and + // return it. + policyResult := &PolicyResult{AuthorityMatches: make(map[string]AuthorityMatch)} + for i := 0; i < len(cip.Authorities); i++ { + result, ok := <-results + if !ok { + authorityErrors = append(authorityErrors, fmt.Errorf("results channel failed to produce a result")) + } + switch { + case result.err != nil: + authorityErrors = append(authorityErrors, result.err) + case len(result.signatures) > 0: + policyResult.AuthorityMatches[result.name] = AuthorityMatch{Signatures: result.signatures} + case len(result.attestations) > 0: + policyResult.AuthorityMatches[result.name] = AuthorityMatch{Attestations: result.attestations} + default: + authorityErrors = append(authorityErrors, fmt.Errorf("failed to process authority: %s", result.name)) } } if len(authorityErrors) > 0 { return nil, authorityErrors } - return &policyResult, authorityErrors + return policyResult, authorityErrors } func ociSignatureToPolicySignature(ctx context.Context, sigs []oci.Signature) []PolicySignature {