Skip to content

Commit

Permalink
Add rate limiter to receiver and events
Browse files Browse the repository at this point in the history
Signed-off-by: Philip Laine <[email protected]>
  • Loading branch information
Philip Laine committed Mar 21, 2021
1 parent 763c291 commit 4927594
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 10 deletions.
8 changes: 7 additions & 1 deletion controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
"net/http/httptest"
"path/filepath"
"testing"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sethvargo/go-limiter/memorystore"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -127,10 +129,14 @@ var _ = Describe("Event handlers", func() {
Expect(k8sClient.Create(ctx, &provider)).To(Succeed())

By("Creating and starting event server")
store, err := memorystore.New(&memorystore.Config{
Interval: 10 * time.Minute,
})
Expect(err).ShouldNot(HaveOccurred())
// TODO let OS assign port number
eventServer := server.NewEventServer("127.0.0.1:56789", logf.Log, k8sClient)
stopCh = make(chan struct{})
go eventServer.ListenAndServe(stopCh)
go eventServer.ListenAndServe(stopCh, store)
})

AfterEach(func() {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5
github.com/onsi/ginkgo v1.14.1
github.com/onsi/gomega v1.10.2
github.com/sethvargo/go-limiter v0.6.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.6.1
github.com/whilp/git-urls v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sethvargo/go-limiter v0.6.0 h1:186jmCdl1ItQUXbHFdTBrFSZztN6/bL9855C5jfMlKU=
github.com/sethvargo/go-limiter v0.6.0/go.mod h1:C0kbSFbiriE5k2FFOe18M1YZbAR2Fiwf72uGu0CXCcU=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down
34 changes: 31 additions & 3 deletions internal/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ package server

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"time"

"github.com/go-logr/logr"
"github.com/sethvargo/go-limiter"
"github.com/sethvargo/go-limiter/httplimit"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/fluxcd/pkg/recorder"
)

// EventServer handles event POST requests
Expand All @@ -43,10 +50,13 @@ func NewEventServer(port string, logger logr.Logger, kubeClient client.Client) *
}

// ListenAndServe starts the HTTP server on the specified port
func (s *EventServer) ListenAndServe(stopCh <-chan struct{}) {
func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, store limiter.Store) error {
middleware, err := httplimit.NewMiddleware(store, eventKeyFunc)
if err != nil {
return err
}
mux := http.NewServeMux()
mux.HandleFunc("/", s.handleEvent())

mux.Handle("/", middleware.Handle(http.HandlerFunc(s.handleEvent())))
srv := &http.Server{
Addr: s.port,
Handler: mux,
Expand All @@ -69,4 +79,22 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}) {
} else {
s.logger.Info("Event server stopped")
}

return nil
}

func eventKeyFunc(r *http.Request) (string, error) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return "", err
}
defer r.Body.Close()

event := &recorder.Event{}
err = json.Unmarshal(body, event)
if err != nil {
return "", err
}

return fmt.Sprintf("event/%s/%s", event.InvolvedObject.String(), event.Severity), nil
}
148 changes: 148 additions & 0 deletions internal/server/event_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package server

import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/onsi/gomega"
"github.com/sethvargo/go-limiter/httplimit"
"github.com/sethvargo/go-limiter/memorystore"
corev1 "k8s.io/api/core/v1"

"github.com/fluxcd/pkg/recorder"
)

func TestEventKeyFunc(t *testing.T) {
g := gomega.NewGomegaWithT(t)

// Setup middleware
store, err := memorystore.New(&memorystore.Config{
Interval: 10 * time.Minute,
})
g.Expect(err).ShouldNot(gomega.HaveOccurred())
middleware, err := httplimit.NewMiddleware(store, eventKeyFunc)
g.Expect(err).ShouldNot(gomega.HaveOccurred())
handler := middleware.Handle(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))

// Make request
tests := []struct {
involvedObject corev1.ObjectReference
severity string
rateLimit bool
}{
{
involvedObject: corev1.ObjectReference{
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
Kind: "Kustomization",
Name: "1",
Namespace: "1",
},
severity: recorder.EventSeverityInfo,
rateLimit: false,
},
{
involvedObject: corev1.ObjectReference{
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
Kind: "Kustomization",
Name: "1",
Namespace: "1",
},
severity: recorder.EventSeverityInfo,
rateLimit: true,
},
{
involvedObject: corev1.ObjectReference{
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
Kind: "Kustomization",
Name: "1",
Namespace: "1",
},
severity: recorder.EventSeverityError,
rateLimit: true,
},
{
involvedObject: corev1.ObjectReference{
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
Kind: "Kustomization",
Name: "1",
Namespace: "1",
},
severity: recorder.EventSeverityInfo,
rateLimit: true,
},
{
involvedObject: corev1.ObjectReference{
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
Kind: "Kustomization",
Name: "2",
Namespace: "2",
},
severity: recorder.EventSeverityInfo,
rateLimit: false,
},
{
involvedObject: corev1.ObjectReference{
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
Kind: "Kustomization",
Name: "3",
Namespace: "3",
},
severity: recorder.EventSeverityInfo,
rateLimit: false,
},
{
involvedObject: corev1.ObjectReference{
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
Kind: "Kustomization",
Name: "2",
Namespace: "2",
},
severity: recorder.EventSeverityInfo,
rateLimit: true,
},
}
for _, tt := range tests {
t.Run(tt.involvedObject.String(), func(t *testing.T) {
event := recorder.Event{
InvolvedObject: tt.involvedObject,
Severity: tt.severity,
}
eventData, err := json.Marshal(event)
g.Expect(err).ShouldNot(gomega.HaveOccurred())

req := httptest.NewRequest("POST", "/", bytes.NewBuffer(eventData))
g.Expect(err).ShouldNot(gomega.HaveOccurred())
res := httptest.NewRecorder()
handler.ServeHTTP(res, req)

if tt.rateLimit {
g.Expect(res.Code).Should(gomega.Equal(429))
g.Expect(res.Header().Get("X-Ratelimit-Remaining")).Should(gomega.Equal("0"))
} else {
g.Expect(res.Code).Should(gomega.Equal(200))
}
})
}
}
22 changes: 18 additions & 4 deletions internal/server/receiver_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ package server

import (
"context"
"fmt"
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/sethvargo/go-limiter"
"github.com/sethvargo/go-limiter/httplimit"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -43,11 +48,13 @@ func NewReceiverServer(port string, logger logr.Logger, kubeClient client.Client
}

// ListenAndServe starts the HTTP server on the specified port
func (s *ReceiverServer) ListenAndServe(stopCh <-chan struct{}) {
func (s *ReceiverServer) ListenAndServe(stopCh <-chan struct{}, store limiter.Store) error {
middleware, err := httplimit.NewMiddleware(store, receiverKeyFunc)
if err != nil {
return err
}
mux := http.DefaultServeMux

mux.HandleFunc("/hook/", s.handlePayload())

mux.Handle("/hook/", middleware.Handle(http.HandlerFunc(s.handlePayload())))
srv := &http.Server{
Addr: s.port,
Handler: mux,
Expand All @@ -70,4 +77,11 @@ func (s *ReceiverServer) ListenAndServe(stopCh <-chan struct{}) {
} else {
s.logger.Info("Receiver server stopped")
}

return nil
}

func receiverKeyFunc(r *http.Request) (string, error) {
digest := url.PathEscape(strings.TrimLeft(r.RequestURI, "/hook/"))
return fmt.Sprintf("receiver/%s", digest), nil
}
86 changes: 86 additions & 0 deletions internal/server/receiver_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package server

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/onsi/gomega"
"github.com/sethvargo/go-limiter/httplimit"
"github.com/sethvargo/go-limiter/memorystore"
)

func TestReceiverKeyFunc(t *testing.T) {
g := gomega.NewGomegaWithT(t)

// Setup middleware
store, err := memorystore.New(&memorystore.Config{
Interval: 10 * time.Minute,
})
g.Expect(err).ShouldNot(gomega.HaveOccurred())
middleware, err := httplimit.NewMiddleware(store, receiverKeyFunc)
g.Expect(err).ShouldNot(gomega.HaveOccurred())
handler := middleware.Handle(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))

// Make request
tests := []struct {
digest string
rateLimit bool
}{
{
digest: "1",
rateLimit: false,
},
{
digest: "1",
rateLimit: true,
},
{
digest: "2",
rateLimit: false,
},
{
digest: "3",
rateLimit: false,
},
{
digest: "2",
rateLimit: true,
},
}
for _, tt := range tests {
t.Run(tt.digest, func(t *testing.T) {
req := httptest.NewRequest("GET", fmt.Sprintf("/hook/%s", tt.digest), nil)
g.Expect(err).ShouldNot(gomega.HaveOccurred())
res := httptest.NewRecorder()
handler.ServeHTTP(res, req)

if tt.rateLimit {
g.Expect(res.Code).Should(gomega.Equal(429))
g.Expect(res.Header().Get("X-Ratelimit-Remaining")).Should(gomega.Equal("0"))
} else {
g.Expect(res.Code).Should(gomega.Equal(200))
}
})
}
}
Loading

0 comments on commit 4927594

Please sign in to comment.