Skip to content

Commit

Permalink
Resource client patch for apply status (#517)
Browse files Browse the repository at this point in the history
* Add patch to resource client to cut api calls in half

* Move to apply status since consul api doesn't have patch

* Implement the other apply status functions

* Get rid of debug info

* Clean up extra code

* Actually use the bytes from the provided status

* Add changelog

* Pod resource client to implement interface

* Api client to implement interface

* Namespace resource client to implement interface

* Use more performant one with standard k8s clients

* Namespace client needs too

* Codegen

* Fix reporter unit test

* Fix reconciler unit tests

* Codegen

* Update interface to use status client

* Codegen

* Use namespaced status by default

* Fix reporter unit test

* Fix reconciler unit test

* Prefer merge patch type

* Ensure we render all fields

* Codegen

* Go mod tidy

* Prefer json patch type

* Ensure statuses is always defaulted

* Prefer json encoding

* Codegen

* Improve comment readability

* Get rid of gogo

* Get rid of gogo

* Get rid of gogo, with codegen

* Move shared impl to shared package

* Move shared impl for jsonpatch to shared package

* Move other clients to shared impl for jsonpatch

* Remove gogo

* Only clone if we are going to write status

* Be clear we are patching the status

* Add note in changelog about rendering enums as strings

Co-authored-by: soloio-bulldozer[bot] <48420018+soloio-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
Kevin Dorosh and soloio-bulldozer[bot] authored Sep 14, 2022
1 parent 2b31151 commit 618b1c9
Show file tree
Hide file tree
Showing 24 changed files with 490 additions and 117 deletions.
7 changes: 7 additions & 0 deletions changelog/v0.30.3/patch-status.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
changelog:
- type: FIX
resolvesIssue: false
description: >-
Improve scalability of v2 status reporter by using k8s patch instead of read then write.
Also updates the status written to render enums as strings rather than ints for readability.
issueLink: https://github.com/solo-io/gloo/issues/7076
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/solo-io/solo-kit/api/external/kubernetes/customresourcedefinition"

"github.com/solo-io/solo-kit/pkg/api/shared"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/common"
"github.com/solo-io/solo-kit/pkg/api/v1/resources"
Expand All @@ -18,6 +19,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
)

type customResourceDefinitionResourceClient struct {
Expand Down Expand Up @@ -124,6 +126,34 @@ func (rc *customResourceDefinitionResourceClient) Write(resource resources.Resou
return rc.Read(customResourceDefinitionObj.Namespace, customResourceDefinitionObj.Name, clients.ReadOpts{Ctx: opts.Ctx})
}

func (rc *customResourceDefinitionResourceClient) ApplyStatus(statusClient resources.StatusClient, inputResource resources.InputResource, opts clients.ApplyStatusOpts) (resources.Resource, error) {
name := inputResource.GetMetadata().GetName()
namespace := inputResource.GetMetadata().GetNamespace()
if err := resources.ValidateName(name); err != nil {
return nil, errors.Wrapf(err, "validation error")
}
opts = opts.WithDefaults()

data, err := shared.GetJsonPatchData(inputResource)
if err != nil {
return nil, errors.Wrapf(err, "error getting status json patch data")
}

customResourceDefinitionObj, err := rc.apiExts.ApiextensionsV1().CustomResourceDefinitions().Patch(opts.Ctx, name, types.JSONPatchType, data, metav1.PatchOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil, errors.NewNotExistErr(namespace, name, err)
}
return nil, errors.Wrapf(err, "patching customResourceDefinitionObj status from kubernetes")
}
resource := FromKubeCustomResourceDefinition(customResourceDefinitionObj)

if resource == nil {
return nil, errors.Errorf("customResourceDefinitionObj %v is not kind %v", name, rc.Kind())
}
return resource, nil
}

func (rc *customResourceDefinitionResourceClient) Delete(namespace, name string, opts clients.DeleteOpts) error {
opts = opts.WithDefaults()
if !rc.exist(opts.Ctx, namespace, name) {
Expand Down
30 changes: 30 additions & 0 deletions pkg/api/external/kubernetes/deployment/resource_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"

kubedeployment "github.com/solo-io/solo-kit/api/external/kubernetes/deployment"
"github.com/solo-io/solo-kit/pkg/api/shared"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/common"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/kube/cache"
skkube "github.com/solo-io/solo-kit/pkg/api/v1/resources/common/kubernetes"
Expand All @@ -16,6 +17,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -123,6 +125,34 @@ func (rc *deploymentResourceClient) Write(resource resources.Resource, opts clie
return rc.Read(deploymentObj.Namespace, deploymentObj.Name, clients.ReadOpts{Ctx: opts.Ctx})
}

func (rc *deploymentResourceClient) ApplyStatus(statusClient resources.StatusClient, inputResource resources.InputResource, opts clients.ApplyStatusOpts) (resources.Resource, error) {
name := inputResource.GetMetadata().GetName()
namespace := inputResource.GetMetadata().GetNamespace()
if err := resources.ValidateName(name); err != nil {
return nil, errors.Wrapf(err, "validation error")
}
opts = opts.WithDefaults()

data, err := shared.GetJsonPatchData(inputResource)
if err != nil {
return nil, errors.Wrapf(err, "error getting status json patch data")
}

deploymentObj, err := rc.Kube.AppsV1().Deployments(namespace).Patch(opts.Ctx, name, types.JSONPatchType, data, metav1.PatchOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil, errors.NewNotExistErr(namespace, name, err)
}
return nil, errors.Wrapf(err, "patching deployment status from kubernetes")
}
resource := FromKubeDeployment(deploymentObj)

if resource == nil {
return nil, errors.Errorf("deployment %v is not kind %v", name, rc.Kind())
}
return resource, nil
}

func (rc *deploymentResourceClient) Delete(namespace, name string, opts clients.DeleteOpts) error {
opts = opts.WithDefaults()
if !rc.exist(opts.Ctx, namespace, name) {
Expand Down
29 changes: 29 additions & 0 deletions pkg/api/external/kubernetes/job/resource_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"

kubejob "github.com/solo-io/solo-kit/api/external/kubernetes/job"
"github.com/solo-io/solo-kit/pkg/api/shared"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/common"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/kube/cache"
skkube "github.com/solo-io/solo-kit/pkg/api/v1/resources/common/kubernetes"
Expand All @@ -16,6 +17,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -123,6 +125,33 @@ func (rc *jobResourceClient) Write(resource resources.Resource, opts clients.Wri
return rc.Read(jobObj.Namespace, jobObj.Name, clients.ReadOpts{Ctx: opts.Ctx})
}

func (rc *jobResourceClient) ApplyStatus(statusClient resources.StatusClient, inputResource resources.InputResource, opts clients.ApplyStatusOpts) (resources.Resource, error) {
name := inputResource.GetMetadata().GetName()
namespace := inputResource.GetMetadata().GetNamespace()
if err := resources.ValidateName(name); err != nil {
return nil, errors.Wrapf(err, "validation error")
}
opts = opts.WithDefaults()

data, err := shared.GetJsonPatchData(inputResource)
if err != nil {
return nil, errors.Wrapf(err, "error getting status json patch data")
}
jobObj, err := rc.Kube.BatchV1().Jobs(namespace).Patch(opts.Ctx, name, types.JSONPatchType, data, metav1.PatchOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil, errors.NewNotExistErr(namespace, name, err)
}
return nil, errors.Wrapf(err, "patching job status from kubernetes")
}
resource := FromKubeJob(jobObj)

if resource == nil {
return nil, errors.Errorf("job %v is not kind %v", name, rc.Kind())
}
return resource, nil
}

func (rc *jobResourceClient) Delete(namespace, name string, opts clients.DeleteOpts) error {
opts = opts.WithDefaults()
if !rc.exist(opts.Ctx, namespace, name) {
Expand Down
29 changes: 29 additions & 0 deletions pkg/api/external/kubernetes/namespace/resource_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/bugsnag/bugsnag-go/errors"
"github.com/rotisserie/eris"
kubenamespace "github.com/solo-io/solo-kit/api/external/kubernetes/namespace"
"github.com/solo-io/solo-kit/pkg/api/shared"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/common"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/kube/cache"
Expand All @@ -17,6 +18,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -122,6 +124,33 @@ func (rc *namespaceResourceClient) Write(resource resources.Resource, opts clien
return rc.Read(namespaceObj.Namespace, namespaceObj.Name, clients.ReadOpts{Ctx: opts.Ctx})
}

func (rc *namespaceResourceClient) ApplyStatus(statusClient resources.StatusClient, inputResource resources.InputResource, opts clients.ApplyStatusOpts) (resources.Resource, error) {
name := inputResource.GetMetadata().GetName()
namespace := inputResource.GetMetadata().GetNamespace()
if err := resources.ValidateName(name); err != nil {
return nil, eris.Wrapf(err, "validation error")
}
opts = opts.WithDefaults()

data, err := shared.GetJsonPatchData(inputResource)
if err != nil {
return nil, eris.Wrapf(err, "error getting status json patch data")
}
namespaceObj, err := rc.Kube.CoreV1().Namespaces().Patch(opts.Ctx, name, types.JSONPatchType, data, metav1.PatchOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil, skerrors.NewNotExistErr(namespace, name, err)
}
return nil, eris.Wrapf(err, "reading namespaceObj from kubernetes")
}
resource := FromKubeNamespace(namespaceObj)

if resource == nil {
return nil, eris.Errorf("namespaceObj %v is not kind %v", name, rc.Kind())
}
return resource, nil
}

func (rc *namespaceResourceClient) Delete(namespace, name string, opts clients.DeleteOpts) error {
opts = opts.WithDefaults()
if !rc.exist(opts.Ctx, namespace, name) {
Expand Down
29 changes: 29 additions & 0 deletions pkg/api/external/kubernetes/pod/resource_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"

kubepod "github.com/solo-io/solo-kit/api/external/kubernetes/pod"
"github.com/solo-io/solo-kit/pkg/api/shared"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/common"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/kube/cache"
skkube "github.com/solo-io/solo-kit/pkg/api/v1/resources/common/kubernetes"
Expand All @@ -16,6 +17,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -123,6 +125,33 @@ func (rc *podResourceClient) Write(resource resources.Resource, opts clients.Wri
return rc.Read(podObj.Namespace, podObj.Name, clients.ReadOpts{Ctx: opts.Ctx})
}

func (rc *podResourceClient) ApplyStatus(statusClient resources.StatusClient, inputResource resources.InputResource, opts clients.ApplyStatusOpts) (resources.Resource, error) {
name := inputResource.GetMetadata().GetName()
namespace := inputResource.GetMetadata().GetNamespace()
if err := resources.ValidateName(name); err != nil {
return nil, errors.Wrapf(err, "validation error")
}
opts = opts.WithDefaults()

data, err := shared.GetJsonPatchData(inputResource)
if err != nil {
return nil, errors.Wrapf(err, "error getting status json patch data")
}
podObj, err := rc.Kube.CoreV1().Pods(namespace).Patch(opts.Ctx, name, types.JSONPatchType, data, metav1.PatchOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil, errors.NewNotExistErr(namespace, name, err)
}
return nil, errors.Wrapf(err, "patching podObj status from kubernetes")
}
resource := FromKubePod(podObj)

if resource == nil {
return nil, errors.Errorf("podObj %v is not kind %v", name, rc.Kind())
}
return resource, nil
}

func (rc *podResourceClient) Delete(namespace, name string, opts clients.DeleteOpts) error {
opts = opts.WithDefaults()
if !rc.exist(opts.Ctx, namespace, name) {
Expand Down
29 changes: 29 additions & 0 deletions pkg/api/external/kubernetes/service/resource_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"

kubeservice "github.com/solo-io/solo-kit/api/external/kubernetes/service"
"github.com/solo-io/solo-kit/pkg/api/shared"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/common"
"github.com/solo-io/solo-kit/pkg/api/v1/clients/kube/cache"
skkube "github.com/solo-io/solo-kit/pkg/api/v1/resources/common/kubernetes"
Expand All @@ -16,6 +17,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -123,6 +125,33 @@ func (rc *serviceResourceClient) Write(resource resources.Resource, opts clients
return rc.Read(serviceObj.Namespace, serviceObj.Name, clients.ReadOpts{Ctx: opts.Ctx})
}

func (rc *serviceResourceClient) ApplyStatus(statusClient resources.StatusClient, inputResource resources.InputResource, opts clients.ApplyStatusOpts) (resources.Resource, error) {
name := inputResource.GetMetadata().GetName()
namespace := inputResource.GetMetadata().GetNamespace()
if err := resources.ValidateName(name); err != nil {
return nil, errors.Wrapf(err, "validation error")
}
opts = opts.WithDefaults()

data, err := shared.GetJsonPatchData(inputResource)
if err != nil {
return nil, errors.Wrapf(err, "error getting status json patch data")
}
serviceObj, err := rc.Kube.CoreV1().Services(namespace).Patch(opts.Ctx, name, types.JSONPatchType, data, metav1.PatchOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil, errors.NewNotExistErr(namespace, name, err)
}
return nil, errors.Wrapf(err, "patching serviceObj status from kubernetes")
}
resource := FromKubeService(serviceObj)

if resource == nil {
return nil, errors.Errorf("serviceObj %v is not kind %v", name, rc.Kind())
}
return resource, nil
}

func (rc *serviceResourceClient) Delete(namespace, name string, opts clients.DeleteOpts) error {
opts = opts.WithDefaults()
if !rc.exist(opts.Ctx, namespace, name) {
Expand Down
71 changes: 71 additions & 0 deletions pkg/api/shared/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package shared

import (
"bytes"
"fmt"

"github.com/golang/protobuf/jsonpb"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/resources"
"github.com/solo-io/solo-kit/pkg/errors"
)

// ApplyStatus is used by clients that don't support patch updates to resource statuses (e.g. consul, files, in-memory)
func ApplyStatus(rc clients.ResourceClient, statusClient resources.StatusClient, inputResource resources.InputResource, opts clients.ApplyStatusOpts) (resources.Resource, error) {
name := inputResource.GetMetadata().GetName()
namespace := inputResource.GetMetadata().GetNamespace()
res, err := rc.Read(namespace, name, clients.ReadOpts{
Ctx: opts.Ctx,
Cluster: opts.Cluster,
})

if err != nil {
return nil, errors.Wrapf(err, "error reading before applying status")
}

inputRes, ok := res.(resources.InputResource)
if !ok {
return nil, errors.Errorf("error converting resource of type %T to input resource to apply status", res)
}

statusClient.SetStatus(inputRes, statusClient.GetStatus(inputResource))
updatedRes, err := rc.Write(inputRes, clients.WriteOpts{
Ctx: opts.Ctx,
OverwriteExisting: true,
})

if err != nil {
return nil, errors.Wrapf(err, "error writing to apply status")
}
return updatedRes, nil
}

// GetJsonPatchData returns the json patch data for the input resource.
// Prefer using json patch for single api call status updates when supported (e.g. k8s) to avoid ratelimiting
// to the k8s apiserver (e.g. https://github.com/solo-io/gloo/blob/a083522af0a4ce22f4d2adf3a02470f782d5a865/projects/gloo/api/v1/settings.proto#L337-L350)
func GetJsonPatchData(inputResource resources.InputResource) ([]byte, error) {
namespacedStatuses := inputResource.GetNamespacedStatuses().GetStatuses()
if len(namespacedStatuses) != 1 {
// we only expect our namespace to report here; we don't want to blow away statuses from other reporters
return nil, errors.Errorf("unexpected number of namespaces in input resource: %v", len(inputResource.GetNamespacedStatuses().GetStatuses()))
}
ns := ""
for loopNs := range inputResource.GetNamespacedStatuses().GetStatuses() {
ns = loopNs
}
status := inputResource.GetNamespacedStatuses().GetStatuses()[ns]

buf := &bytes.Buffer{}
var marshaller jsonpb.Marshaler
marshaller.EnumsAsInts = false // prefer jsonpb over encoding/json marshaller since it renders enum as string not int (i.e., state is human-readable)
marshaller.EmitDefaults = false // keep status as small as possible
err := marshaller.Marshal(buf, status)
if err != nil {
return nil, errors.Wrapf(err, "marshalling input resource")
}

bytes := buf.Bytes()
patch := fmt.Sprintf(`[{"op": "replace", "path": "/status/statuses/%s", "value": %s}]`, ns, string(bytes)) // only replace our status so other reporters are not affected (e.g. blue-green of gloo)
data := []byte(patch)
return data, nil
}
Loading

0 comments on commit 618b1c9

Please sign in to comment.