Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Improved Schema Definition Cache #1

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ replace (
require (
github.com/adrg/xdg v0.4.0
github.com/golang/mock v1.6.0
github.com/google/gnostic v0.5.7-v3refs
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/pborman/uuid v1.2.1
Expand All @@ -28,6 +29,7 @@ require (
github.com/urfave/cli v1.22.14
github.com/urfave/cli/v2 v2.25.7
golang.org/x/sync v0.3.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.27.4
k8s.io/apiextensions-apiserver v0.27.4
k8s.io/apimachinery v0.27.4
Expand Down Expand Up @@ -56,7 +58,6 @@ require (
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
Expand Down Expand Up @@ -100,7 +101,6 @@ require (
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/component-base v0.27.4 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
Expand Down
51 changes: 51 additions & 0 deletions pkg/debounce/refresher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package debounce

import (
"context"
"sync"
"time"

"github.com/sirupsen/logrus"
)

// Refreshable represents an object which can be refreshed. This should be protected by a mutex for concurrent operation.
type Refreshable interface {
Refresh() error
}

// DebouncableRefresher is used to debounce multiple attempts to refresh a refreshable type.
type DebouncableRefresher struct {
sync.Mutex
// Refreshable is any type that can be refreshed. The refresh method should by protected by a mutex internally.
Refreshable Refreshable
current context.CancelFunc
}

// RefreshAfter requests a refresh after a certain time has passed. Subsequent calls to this method will
// delay the requested refresh by the new duration. Note that this is a total override of the previous calls - calling
// RefreshAfter(time.Second * 2) and then immediatley calling RefreshAfter(time.Microsecond * 1) will run a refresh
// in one microsecond
func (d *DebouncableRefresher) RefreshAfter(duration time.Duration) {
d.Lock()
defer d.Unlock()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
if d.current != nil {
d.current()
}
d.current = cancel
go func() {
timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-ctx.Done():
// this indicates that the context was cancelled. Do nothing.
case <-timer.C:
// note this can cause multiple refreshes to happen concurrently
err := d.Refreshable.Refresh()
if err != nil {
logrus.Errorf("failed to refresh with error: %v", err)
}
}
}()
}
44 changes: 44 additions & 0 deletions pkg/debounce/refresher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package debounce

import (
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type refreshable struct {
wasRefreshed atomic.Bool
retErr error
}

func (r *refreshable) Refresh() error {
r.wasRefreshed.Store(true)
return r.retErr
}

func TestRefreshAfter(t *testing.T) {
ref := refreshable{}
debounce := DebouncableRefresher{
Refreshable: &ref,
}
debounce.RefreshAfter(time.Millisecond * 2)
debounce.RefreshAfter(time.Microsecond * 2)
time.Sleep(time.Millisecond * 1)
require.True(t, ref.wasRefreshed.Load())
ref.wasRefreshed.Store(false)
time.Sleep(time.Millisecond * 2)
require.False(t, ref.wasRefreshed.Load())

ref = refreshable{
retErr: fmt.Errorf("Some error"),
}
debounce = DebouncableRefresher{
Refreshable: &ref,
}
debounce.RefreshAfter(time.Microsecond * 2)
time.Sleep(time.Millisecond * 1)
require.True(t, ref.wasRefreshed.Load())
}
3 changes: 1 addition & 2 deletions pkg/resources/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/rancher/steve/pkg/resources/formatters"
"github.com/rancher/steve/pkg/resources/userpreferences"
"github.com/rancher/steve/pkg/schema"
steveschema "github.com/rancher/steve/pkg/schema"
"github.com/rancher/steve/pkg/stores/proxy"
"github.com/rancher/steve/pkg/summarycache"
corecontrollers "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
Expand All @@ -25,7 +24,7 @@ import (
)

func DefaultSchemas(ctx context.Context, baseSchema *types.APISchemas, ccache clustercache.ClusterCache,
cg proxy.ClientGetter, schemaFactory steveschema.Factory, serverVersion string) error {
cg proxy.ClientGetter, schemaFactory schema.Factory, serverVersion string) error {
counts.Register(baseSchema, ccache)
subscribe.Register(baseSchema, func(apiOp *types.APIRequest) *types.APISchemas {
user, ok := request.UserFrom(apiOp.Context())
Expand Down
22 changes: 6 additions & 16 deletions pkg/schema/converter/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ var (
}
)

func AddCustomResources(crd apiextv1.CustomResourceDefinitionClient, schemas map[string]*types.APISchema) error {
// addCustomResources uses the openAPISchema defined on CRDs to provide field definitions to previously discovered schemas.
// Note that this function does not create new schemas - it only adds details to resources already present in the schemas map.
func addCustomResources(crd apiextv1.CustomResourceDefinitionClient, schemas map[string]*types.APISchema) error {
crds, err := crd.List(metav1.ListOptions{})
if err != nil {
return nil
Expand All @@ -41,14 +43,14 @@ func AddCustomResources(crd apiextv1.CustomResourceDefinitionClient, schemas map
group, kind := crd.Spec.Group, crd.Status.AcceptedNames.Kind

for _, version := range crd.Spec.Versions {
forVersion(&crd, group, kind, version, schemas)
forVersion(group, kind, version, schemas)
}
}

return nil
}

func forVersion(crd *v1.CustomResourceDefinition, group, kind string, version v1.CustomResourceDefinitionVersion, schemasMap map[string]*types.APISchema) {
func forVersion(group, kind string, version v1.CustomResourceDefinitionVersion, schemasMap map[string]*types.APISchema) {
var versionColumns []table.Column
for _, col := range version.AdditionalPrinterColumns {
versionColumns = append(versionColumns, table.Column{
Expand All @@ -73,18 +75,6 @@ func forVersion(crd *v1.CustomResourceDefinition, group, kind string, version v1
attributes.SetColumns(schema, versionColumns)
}
if version.Schema != nil && version.Schema.OpenAPIV3Schema != nil {
if fieldsSchema := modelV3ToSchema(id, crd.Spec.Versions[0].Schema.OpenAPIV3Schema, schemasMap); fieldsSchema != nil {
for k, v := range staticFields {
fieldsSchema.ResourceFields[k] = v
}
for k, v := range fieldsSchema.ResourceFields {
if schema.ResourceFields == nil {
schema.ResourceFields = map[string]schemas.Field{}
}
if _, ok := schema.ResourceFields[k]; !ok {
schema.ResourceFields[k] = v
}
}
}
schema.Description = version.Schema.OpenAPIV3Schema.Description
}
}
Loading