Skip to content

Commit

Permalink
Add parallelization for processing policies / authorities. (#1795)
Browse files Browse the repository at this point in the history
* Add parallelization for processing policies / authorities.

Signed-off-by: Ville Aikas <[email protected]>

* Simplify by removing the wg. If error encountered, create an internal
error.

Signed-off-by: Ville Aikas <[email protected]>
  • Loading branch information
vaikas authored Apr 24, 2022
1 parent 8cac645 commit 683a6c0
Showing 1 changed file with 103 additions and 48 deletions.
151 changes: 103 additions & 48 deletions pkg/cosign/kubernetes/webhook/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,13 @@ func (v *Validator) validatePodSpec(ctx context.Context, ps *corev1.PodSpec, opt
// reasonable that the return value is 0, nil since there were no errors, but
// the image was not validated against any matching policy and hence authority.
func validatePolicies(ctx context.Context, ref name.Reference, policies map[string]webhookcip.ClusterImagePolicy, remoteOpts ...ociremote.Option) (map[string]*PolicyResult, map[string][]error) {
// Gather all validated policies here.
policyResults := make(map[string]*PolicyResult)
// For a policy that does not pass at least one authority, gather errors
// here so that we can give meaningful errors to the user.
ret := map[string][]error{}
type retChannelType struct {
name string
policyResult *PolicyResult
errors []error
}
results := make(chan retChannelType, len(policies))

// For each matching policy it must validate at least one Authority within
// it.
// From the Design document, the part about multiple Policies matching:
Expand All @@ -249,31 +251,53 @@ func validatePolicies(ctx context.Context, ref name.Reference, policies map[stri
// If none of the Authorities for a given policy pass the checks, gather
// the errors here. If one passes, do not return the errors.
for cipName, cip := range policies {
// Due to running in gofunc
cipName := cipName
cip := cip
logging.FromContext(ctx).Debugf("Checking Policy: %s", cipName)
policyResult, errs := ValidatePolicy(ctx, ref, cip, remoteOpts...)
if len(errs) > 0 {
ret[cipName] = append(ret[cipName], errs...)
} else {
// Ok, at least one Authority on the policy passed. If there's a CIP level
// policy, apply it against the results of the successful Authorities
// outputs.
if cip.Policy != nil {
logging.FromContext(ctx).Infof("Validating CIP level policy for %s", cipName)
policyJSON, err := json.Marshal(policyResult)
if err != nil {
ret[cipName] = append(ret[cipName], errors.Wrap(err, "marshaling policyresult"))
} else {
logging.FromContext(ctx).Infof("Validating CIP level policy against %s", string(policyJSON))
err = policy.EvaluatePolicyAgainstJSON(ctx, "ClusterImagePolicy", cip.Policy.Type, cip.Policy.Data, policyJSON)
go func() {
result := retChannelType{name: cipName}

result.policyResult, result.errors = ValidatePolicy(ctx, ref, cip, remoteOpts...)
if len(result.errors) == 0 {
// Ok, at least one Authority on the policy passed. If there's a CIP level
// policy, apply it against the results of the successful Authorities
// outputs.
if cip.Policy != nil {
logging.FromContext(ctx).Infof("Validating CIP level policy for %s", cipName)
policyJSON, err := json.Marshal(result.policyResult)
if err != nil {
ret[cipName] = append(ret[cipName], err)
results <- result
} else {
policyResults[cipName] = policyResult
logging.FromContext(ctx).Infof("Validating CIP level policy against %s", string(policyJSON))
err = policy.EvaluatePolicyAgainstJSON(ctx, "ClusterImagePolicy", cip.Policy.Type, cip.Policy.Data, policyJSON)
if err != nil {
result.errors = append(result.errors, err)
}
}
}
} else {
policyResults[cipName] = policyResult
}
results <- result
}()
}
// Gather all validated policies here.
policyResults := make(map[string]*PolicyResult)
// For a policy that does not pass at least one authority, gather errors
// here so that we can give meaningful errors to the user.
ret := map[string][]error{}

for i := 0; i < len(policies); i++ {
result, ok := <-results
if !ok {
ret["internalerror"] = append(ret["internalerror"], fmt.Errorf("results channel failed to produce a result"))
}
switch {
case len(result.errors) > 0:
ret[result.name] = append(ret[result.name], result.errors...)
case len(result.policyResult.AuthorityMatches) > 0:
policyResults[result.name] = result.policyResult
default:
ret[result.name] = append(ret[result.name], fmt.Errorf("failed to process policy: %s", result.name))
}
}
return policyResults, ret
Expand All @@ -285,40 +309,71 @@ func validatePolicies(ctx context.Context, ref name.Reference, policies map[stri
// Returns PolicyResult, or errors encountered if none of the authorities
// passed.
func ValidatePolicy(ctx context.Context, ref name.Reference, cip webhookcip.ClusterImagePolicy, remoteOpts ...ociremote.Option) (*PolicyResult, []error) {
// If none of the Authorities for a given policy pass the checks, gather
// the errors here. If one passes, do not return the errors.
authorityErrors := []error{}
// We collect all the successfully satisfied Authorities into this and
// return it.
policyResult := PolicyResult{AuthorityMatches: make(map[string]AuthorityMatch)}
// Each gofunc creates and puts one of these into a results channel.
// Once each gofunc finishes, we go through the channel and pull out
// the results.
type retChannelType struct {
name string
attestations map[string][]PolicySignature
signatures []PolicySignature
err error
}
results := make(chan retChannelType, len(cip.Authorities))
for _, authority := range cip.Authorities {
authority := authority // due to gofunc
logging.FromContext(ctx).Debugf("Checking Authority: %s", authority.Name)
// Assignment for appendAssign lint error
authorityRemoteOpts := remoteOpts
authorityRemoteOpts = append(authorityRemoteOpts, authority.RemoteOpts...)

if len(authority.Attestations) > 0 {
// We're doing the verify-attestations path, so validate (.att)
validatedAttestations, err := ValidatePolicyAttestationsForAuthority(ctx, ref, authority, authorityRemoteOpts...)
if err != nil {
authorityErrors = append(authorityErrors, err)
} else {
policyResult.AuthorityMatches[authority.Name] = AuthorityMatch{Attestations: validatedAttestations}
}
} else {
// We're doing the verify path, so validate image signatures (.sig)
validatedSignatures, err := ValidatePolicySignaturesForAuthority(ctx, ref, authority, authorityRemoteOpts...)
if err != nil {
authorityErrors = append(authorityErrors, err)
go func() {
result := retChannelType{name: authority.Name}
// Assignment for appendAssign lint error
authorityRemoteOpts := remoteOpts
authorityRemoteOpts = append(authorityRemoteOpts, authority.RemoteOpts...)

if len(authority.Attestations) > 0 {
// We're doing the verify-attestations path, so validate (.att)
validatedAttestations, err := ValidatePolicyAttestationsForAuthority(ctx, ref, authority, authorityRemoteOpts...)
if err != nil {
result.err = err
} else {
result.attestations = validatedAttestations
}
} else {
policyResult.AuthorityMatches[authority.Name] = AuthorityMatch{Signatures: validatedSignatures}
validatedSignatures, err := ValidatePolicySignaturesForAuthority(ctx, ref, authority, authorityRemoteOpts...)
if err != nil {
result.err = err
} else {
result.signatures = validatedSignatures
}
}
results <- result
}()
}
// If none of the Authorities for a given policy pass the checks, gather
// the errors here. If one passes, do not return the errors.
authorityErrors := []error{}
// We collect all the successfully satisfied Authorities into this and
// return it.
policyResult := &PolicyResult{AuthorityMatches: make(map[string]AuthorityMatch)}
for i := 0; i < len(cip.Authorities); i++ {
result, ok := <-results
if !ok {
authorityErrors = append(authorityErrors, fmt.Errorf("results channel failed to produce a result"))
}
switch {
case result.err != nil:
authorityErrors = append(authorityErrors, result.err)
case len(result.signatures) > 0:
policyResult.AuthorityMatches[result.name] = AuthorityMatch{Signatures: result.signatures}
case len(result.attestations) > 0:
policyResult.AuthorityMatches[result.name] = AuthorityMatch{Attestations: result.attestations}
default:
authorityErrors = append(authorityErrors, fmt.Errorf("failed to process authority: %s", result.name))
}
}
if len(authorityErrors) > 0 {
return nil, authorityErrors
}
return &policyResult, authorityErrors
return policyResult, authorityErrors
}

func ociSignatureToPolicySignature(ctx context.Context, sigs []oci.Signature) []PolicySignature {
Expand Down

0 comments on commit 683a6c0

Please sign in to comment.