Skip to content

Commit

Permalink
defaults implementation (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
wardviaene authored Feb 25, 2022
1 parent dac1a45 commit ffb3773
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 11 deletions.
11 changes: 11 additions & 0 deletions pkg/api/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package api

type Defaults struct {
API string `json:"api" yaml:"api"`
Kind string `json:"kind" yaml:"kind"`
Metadata Metadata `json:"metadata" yaml:"metadata"`
Spec DefaultsSpec `json:"spec" yaml:"spec"`
}
type DefaultsSpec struct {
ConnectTimeout int64 `json:"connectTimeout" yaml:"connectTimeout"`
}
39 changes: 29 additions & 10 deletions pkg/envoy/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (
tls "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
upstreams "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3"
cacheTypes "github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/golang/protobuf/ptypes"
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
)

type Cluster struct{}
const PRESET_CONNECT_TIMEOUT_SECONDS = 2

type Cluster struct {
DefaultsParams DefaultsParams
}

func newCluster() *Cluster {
return &Cluster{}
Expand Down Expand Up @@ -49,7 +52,7 @@ func (c *Cluster) getAllClusterNames(clusters []cacheTypes.Resource) []string {
func (c *Cluster) createCluster(params ClusterParams) *api.Cluster {
var transportSocket *core.TransportSocket
if params.Port == 443 {
tlsContext, err := ptypes.MarshalAny(&tls.UpstreamTlsContext{
tlsContext, err := anypb.New(&tls.UpstreamTlsContext{
Sni: params.TargetHostname,
})
if err != nil {
Expand All @@ -75,10 +78,14 @@ func (c *Cluster) createCluster(params ClusterParams) *api.Cluster {
},
}}

connectTimeout := 2 * time.Second
connectTimeout := PRESET_CONNECT_TIMEOUT_SECONDS * time.Second

if params.ConnectTimeout > 0 {
connectTimeout = time.Duration(params.ConnectTimeout) * time.Second
} else { // set default if defined
if c.DefaultsParams.ConnectTimeout > 0 {
connectTimeout = time.Duration(params.ConnectTimeout) * time.Second
}
}

// add healthchecks
Expand All @@ -95,8 +102,8 @@ func (c *Cluster) createCluster(params ClusterParams) *api.Cluster {
}

healthCheck := &core.HealthCheck{
Timeout: ptypes.DurationProto(healthcheckTimeout),
Interval: ptypes.DurationProto(healthcheckInterval),
Timeout: durationpb.New(healthcheckTimeout),
Interval: durationpb.New(healthcheckInterval),
UnhealthyThreshold: &wrappers.UInt32Value{Value: params.HealthCheck.UnhealthyThreshold},
HealthyThreshold: &wrappers.UInt32Value{Value: params.HealthCheck.HealthyThreshold},
HealthChecker: &core.HealthCheck_HttpHealthCheck_{
Expand All @@ -109,7 +116,7 @@ func (c *Cluster) createCluster(params ClusterParams) *api.Cluster {
// optional parameters
if params.HealthCheck.UnhealthyInterval != "" {
if healthCheckUnhealthyInterval, err := time.ParseDuration(params.HealthCheck.UnhealthyInterval); err == nil {
healthCheck.UnhealthyInterval = ptypes.DurationProto(healthCheckUnhealthyInterval)
healthCheck.UnhealthyInterval = durationpb.New(healthCheckUnhealthyInterval)
}
}

Expand All @@ -122,7 +129,7 @@ func (c *Cluster) createCluster(params ClusterParams) *api.Cluster {
ClusterDiscoveryType: &api.Cluster_Type{
Type: api.Cluster_STRICT_DNS,
},
ConnectTimeout: ptypes.DurationProto(connectTimeout),
ConnectTimeout: durationpb.New(connectTimeout),
DnsLookupFamily: api.Cluster_V4_ONLY,
LbPolicy: api.Cluster_ROUND_ROBIN,
HealthChecks: healthChecks,
Expand Down Expand Up @@ -165,7 +172,7 @@ func (c *Cluster) createCluster(params ClusterParams) *api.Cluster {
},
},
}
typedExtensionProtocolOptionsEncoded, err := ptypes.MarshalAny(typedExtensionProtocolOptions)
typedExtensionProtocolOptionsEncoded, err := anypb.New(typedExtensionProtocolOptions)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -209,3 +216,15 @@ func (c *Cluster) PrintCluster(cache *WorkQueueCache, name string) (string, erro
}
return out, fmt.Errorf("Cluster not found")
}

func (c *Cluster) updateDefaults(clusters []cacheTypes.Resource, params DefaultsParams) error {
c.DefaultsParams.ConnectTimeout = params.ConnectTimeout
for k := range clusters {
cluster := clusters[k].(*api.Cluster)
if cluster.ConnectTimeout.Seconds == PRESET_CONNECT_TIMEOUT_SECONDS {
cluster.ConnectTimeout = durationpb.New((time.Duration(c.DefaultsParams.ConnectTimeout) * time.Second))
clusters[k] = cluster
}
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/envoy/testdata/test-cluster-connection-timeout.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
api: proxy.in4it.io/v1
kind: rule
metadata:
name: test-cluster
name: test-cluster-connectiontimeout
spec:
conditions:
- hostname: test.example.com
Expand Down
6 changes: 6 additions & 0 deletions pkg/envoy/testdata/test-defaults.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
api: proxy.in4it.io/v1
kind: defaults
metadata:
name: test-defaults
spec:
connectTimeout: 15
6 changes: 6 additions & 0 deletions pkg/envoy/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type WorkQueueItem struct {
RateLimitParams RateLimitParams
MTLSParams MTLSParams
LuaFilterParams LuaFilterParams
DefaultsParams DefaultsParams
state string
}

Expand Down Expand Up @@ -204,3 +205,8 @@ type LuaFilterParams struct {
InlineCode string
Listener ListenerParamsListener
}

type DefaultsParams struct {
Name string
ConnectTimeout int64
}
9 changes: 9 additions & 0 deletions pkg/envoy/workqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ func (w *WorkQueue) Submit(items []WorkQueueItem) (string, error) {
item.state = "finished"
}
updateXds = true
case "updateDefaults":
err := w.cluster.updateDefaults(w.cache.clusters, item.DefaultsParams)
if err != nil {
item.state = "error"
logger.Errorf("updateDefaults error: %s", err)
} else {
item.state = "finished"
}
updateXds = true
case "updateListenerWithChallenge":
err := w.listener.updateListenerWithChallenge(&w.cache, item.ChallengeParams)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/envoy/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ func (x *XDS) ImportObject(object pkgApi.Object) ([]WorkQueueItem, error) {
return []WorkQueueItem{}, fmt.Errorf("Couldn't import new rule: %s", err)
}
return items, nil
case "defaults":
defaults := object.Data.(pkgApi.Defaults)
items, err := x.importDefaults(defaults)
if err != nil {
return []WorkQueueItem{}, fmt.Errorf("Couldn't import new rule: %s", err)
}
return items, nil
}

return []WorkQueueItem{}, nil
Expand Down Expand Up @@ -398,6 +405,18 @@ func (x *XDS) importLuaFilter(luaFilter pkgApi.LuaFilter) ([]WorkQueueItem, erro
}, nil
}

func (x *XDS) importDefaults(defaults pkgApi.Defaults) ([]WorkQueueItem, error) {
return []WorkQueueItem{
{
Action: "updateDefaults",
DefaultsParams: DefaultsParams{
Name: defaults.Metadata.Name,
ConnectTimeout: defaults.Spec.ConnectTimeout,
},
},
}, nil
}

func (x *XDS) importRateLimit(rateLimit pkgApi.RateLimit) ([]WorkQueueItem, error) {
var descriptors []RateLimitDescriptor
for _, descriptor := range rateLimit.Spec.Descriptors {
Expand Down
40 changes: 40 additions & 0 deletions pkg/envoy/xds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,3 +1421,43 @@ func TestRuleWithConnectionTimeout(t *testing.T) {
}
}
}

func TestRuleWithDefaults(t *testing.T) {
logger.SetLogLevel(loggo.DEBUG)
s, err := initStorage()
if err != nil {
t.Errorf("Couldn't initialize storage: %s", err)
return
}
x := NewXDS(s, "", "")
ObjectFileNames := []string{"test-cluster-connection-timeout.yaml", "test-cluster-1.yaml", "test-defaults.yaml"}
for _, filename := range ObjectFileNames {
newItems, err := x.putObject(filename)
if err != nil {
t.Errorf("PutObject failed: %s", err)
return
}
_, err = x.workQueue.Submit(newItems)
if err != nil {
t.Errorf("WorkQueue error: %s", err)
return
}
}
allClusters := x.workQueue.cache.clusters
checks := []bool{}
for _, v := range allClusters {
cluster := v.(*clusterAPI.Cluster)
if cluster.Name == "test-cluster-connectiontimeout" || cluster.Name == "test-cluster" {
checks = append(checks, true)
}
if cluster.Name == "test-cluster-connectiontimeout" && cluster.ConnectTimeout.Seconds != 5 {
t.Errorf("Cluster Connect timeout is not 5 (got %d)", cluster.ConnectTimeout.Seconds)
}
if cluster.Name == "test-cluster" && cluster.ConnectTimeout.Seconds != 15 {
t.Errorf("Cluster Connect timeout is not 15 (got %d)", cluster.ConnectTimeout.Seconds)
}
}
if len(checks) != 2 {
t.Errorf("Clusters not found")
}
}
7 changes: 7 additions & 0 deletions pkg/storage/local/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ func (l *LocalStorage) GetObject(name string) ([]api.Object, error) {
return objects, err
}
object.Data = luaFilter
case "defaults":
var defaults api.Defaults
err = yaml.Unmarshal([]byte(contentsSplitted), &defaults)
if err != nil {
return objects, err
}
object.Data = defaults
default:
return objects, errors.New("Rule in wrong format")
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/s3/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ func (s *S3Storage) GetObject(filename string) ([]api.Object, error) {
return objects, err
}
object.Data = luaFilter
case "defaults":
var defaults api.Defaults
err = yaml.Unmarshal([]byte(contentsSplitted), &defaults)
if err != nil {
return objects, err
}
object.Data = defaults
default:
return objects, errors.New("Object in wrong format")
}
Expand Down

0 comments on commit ffb3773

Please sign in to comment.