diff --git a/Gopkg.lock b/Gopkg.lock index 7aac69fbf4..6a91c09b19 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -181,6 +181,12 @@ ] revision = "4d347d79dea0067c945f374f990601decb08abb5" +[[projects]] + branch = "master" + name = "github.com/mattbaird/jsonpatch" + packages = ["."] + revision = "81af80346b1a01caae0cbc27fd3c1ba5b11e189f" + [[projects]] branch = "master" name = "github.com/mitchellh/mapstructure" @@ -362,6 +368,7 @@ [[projects]] name = "k8s.io/api" packages = [ + "admission/v1beta1", "admissionregistration/v1alpha1", "admissionregistration/v1beta1", "apps/v1", @@ -616,6 +623,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "faa21da15f8a0ce6e5c6faafd268ac059c252eac32d222215645cdc308e61c70" + inputs-digest = "e4e6f9000b679617c61b4af906948d2587e5105ea38aac81dd788caf502ffd27" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index b25c511608..abbf4da474 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -42,4 +42,8 @@ [[constraint]] name = "google.golang.org/grpc" - version = "1.8.0" \ No newline at end of file + version = "1.8.0" + +[[constraint]] + branch = "master" + name = "github.com/mattbaird/jsonpatch" diff --git a/build/Makefile b/build/Makefile index 3357691970..32de45ec92 100644 --- a/build/Makefile +++ b/build/Makefile @@ -261,6 +261,7 @@ clean-gcloud-config: # (defaults virtualbox for Linux and macOS, hyperv for windows) if you so desire. minikube-test-cluster: minikube-agones-profile $(MINIKUBE) start --kubernetes-version v1.9.0 --vm-driver $(MINIKUBE_DRIVER) \ + --extra-config=apiserver.Admission.PluginNames=NamespaceLifecycle,LimitRanger,ServiceAccount,PersistentVolumeLabel,DefaultStorageClass,DefaultTolerationSeconds,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,ResourceQuota \ --extra-config=apiserver.Authorization.Mode=RBAC # wait until the master is up until docker run --rm $(common_mounts) --network=host -v $(minikube_cert_mount) $(DOCKER_RUN_ARGS) $(build_tag) kubectl apiVersion: v1
kind: Service
metadata:
  name: agones-controller-service
  namespace: agones-system
spec:
  selector:
    stable.agones.dev/role: controller
  ports:
    - port: 443
      targetPort: 8081
---
apiVersion: admissionregistration.k8s.io/v1beta1
kind: MutatingWebhookConfiguration
metadata:
  name: gameserver-mutation-webhook
  namespace: agones-system
webhooks:
  - name: gameserver-mutations.stable.agones.dev
    failurePolicy: Fail
    clientConfig:
      service:
        name: agones-controller-service
        namespace: agones-system
        path: /mutate-gameserver + rules:
    - apiGroups:
      - stable.agones.dev
      resources:
      - "gameservers"
      apiVersions:
      - "v1alpha1"
      operations:
      - CREATE
--- main() { + exec, err := os.Executable() + if err != nil { + logrus.WithError(err).Fatal("Could not get executable path") + } + + base := filepath.Dir(exec) viper.SetDefault(sidecarFlag, "gcr.io/agones-images/agones-sdk:"+pkg.Version) viper.SetDefault(pullSidecarFlag, false) + viper.SetDefault(certFileFlag, filepath.Join(base, "certs/server.crt")) + viper.SetDefault(keyFileFlag, filepath.Join(base, "certs/server.key")) pflag.String(sidecarFlag, viper.GetString(sidecarFlag), "Flag to overwrite the GameServer sidecar image that is used. Can also use SIDECAR env variable") pflag.Bool(pullSidecarFlag, viper.GetBool(pullSidecarFlag), "For development purposes, set the sidecar image to have a ImagePullPolicy of Always. Can also use ALWAYS_PULL_SIDECAR env variable") pflag.Int32(minPortFlag, 0, "Required. The minimum port that that a GameServer can be allocated to. Can also use MIN_PORT env variable.") pflag.Int32(maxPortFlag, 0, "Required. The maximum port that that a GameServer can be allocated to. Can also use MAX_PORT env variable") + pflag.String(keyFileFlag, viper.GetString(certFileFlag), "Optional. Path to the key file") + pflag.String(certFileFlag, viper.GetString(certFileFlag), "Optional. Path to the crt file") pflag.Parse() viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) @@ -61,16 +76,22 @@ func main() { runtime.Must(viper.BindEnv(pullSidecarFlag)) runtime.Must(viper.BindEnv(minPortFlag)) runtime.Must(viper.BindEnv(maxPortFlag)) + runtime.Must(viper.BindEnv(keyFileFlag)) + runtime.Must(viper.BindEnv(certFileFlag)) runtime.Must(viper.BindPFlags(pflag.CommandLine)) minPort := int32(viper.GetInt64(minPortFlag)) maxPort := int32(viper.GetInt64(maxPortFlag)) sidecarImage := viper.GetString(sidecarFlag) alwaysPullSidecar := viper.GetBool(pullSidecarFlag) + keyFile := viper.GetString(keyFileFlag) + certFile := viper.GetString(certFileFlag) logrus.WithField(sidecarFlag, sidecarImage). WithField("minPort", minPort). WithField("maxPort", maxPort). + WithField(keyFileFlag, keyFile). + WithField(certFileFlag, certFile). WithField("alwaysPullSidecarImage", alwaysPullSidecar). WithField("Version", pkg.Version).Info("starting gameServer operator...") @@ -102,13 +123,21 @@ func main() { agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, 30*time.Second) kubeInformationFactory := informers.NewSharedInformerFactory(kubeClient, 30*time.Second) - c := gameservers.NewController(minPort, maxPort, sidecarImage, alwaysPullSidecar, kubeClient, kubeInformationFactory, extClient, agonesClient, agonesInformerFactory) + + wh := webhooks.NewWebHook(certFile, keyFile) + c := gameservers.NewController(wh, minPort, maxPort, sidecarImage, alwaysPullSidecar, kubeClient, kubeInformationFactory, extClient, agonesClient, agonesInformerFactory) stop := signals.NewStopChannel() kubeInformationFactory.Start(stop) agonesInformerFactory.Start(stop) + go func() { + if err := wh.Run(stop); err != nil { // nolint: vetshadow + logrus.WithError(err).Fatal("could not run webhook server") + } + }() + err = c.Run(2, stop) if err != nil { logrus.WithError(err).Fatal("Could not run gameserver controller") diff --git a/install.yaml b/install.yaml index 970c771574..16a8887985 100644 --- a/install.yaml +++ b/install.yaml @@ -66,6 +66,42 @@ spec: initialDelaySeconds: 3 periodSeconds: 3 --- +apiVersion: v1 +kind: Service +metadata: + name: agones-controller-service + namespace: agones-system +spec: + selector: + stable.agones.dev/role: controller + ports: + - port: 443
      targetPort: 8081
---
apiVersion: admissionregistration.k8s.io/v1beta1
kind: MutatingWebhookConfiguration
metadata:
  name: gameserver-mutation-webhook
  namespace: agones-system
webhooks:
  - name: gameserver-mutations.stable.agones.dev
    failurePolicy: Fail
    clientConfig:
      service:
        name: agones-controller-service
        namespace: agones-system
        path: /mutate-gameserver
    rules:
    - apiGroups: + - stable.agones.dev + resources: + - "gameservers" + apiVersions: + - "v1alpha1" + operations: + - CREATE +--- # Service account, secret, role and rolebinding for sidecar (agones-sdk) pod apiVersion: v1 kind: ServiceAccount diff --git a/pkg/apis/stable/v1alpha1/types.go b/pkg/apis/stable/v1alpha1/types.go index bb186ef4e7..784d7d3d2f 100644 --- a/pkg/apis/stable/v1alpha1/types.go +++ b/pkg/apis/stable/v1alpha1/types.go @@ -23,8 +23,10 @@ import ( ) const ( - // Creating is when the Pod for the GameServer is being created, - // but they have yet to register themselves yet as Ready + // PortAllocation is for when a dynamically allocating GameServer + // is being created, an open port needs to be allocated + PortAllocation State = "PortAllocation" + // Creating is before the Pod for the GameServer is being created Creating State = "Creating" // Starting is for when the Pods for the GameServer are being // created but have yet to register themselves as Ready @@ -161,7 +163,11 @@ func (gs *GameServer) ApplyDefaults() { } if gs.Status.State == "" { - gs.Status.State = Creating + if gs.Spec.PortPolicy == Dynamic { + gs.Status.State = PortAllocation + } else { + gs.Status.State = Creating + } } // health @@ -178,6 +184,25 @@ func (gs *GameServer) ApplyDefaults() { } } +// Validate validates the GameServer configuration. +// If a GameServer is invalid there will be > 0 values in +// the returned array +func (gs *GameServer) Validate() []metav1.StatusCause { + var causes []metav1.StatusCause + + // make sure the container value points to a valid container + _, _, err := gs.FindGameServerContainer() + if err != nil { + causes = append(causes, metav1.StatusCause{ + Type: metav1.CauseTypeFieldValueInvalid, + Field: "container", + Message: err.Error(), + }) + } + + return causes +} + // FindGameServerContainer returns the container that is specified in // spec.gameServer.container. Returns the index and the value. // Returns an error if not found diff --git a/pkg/apis/stable/v1alpha1/types_test.go b/pkg/apis/stable/v1alpha1/types_test.go index 05f3c99008..32b86f3900 100644 --- a/pkg/apis/stable/v1alpha1/types_test.go +++ b/pkg/apis/stable/v1alpha1/types_test.go @@ -73,7 +73,7 @@ func TestGameServerApplyDefaults(t *testing.T) { container: "testing", expected: expected{ protocol: "UDP", - state: Creating, + state: PortAllocation, policy: Dynamic, health: Health{ Disabled: false, @@ -115,6 +115,26 @@ func TestGameServerApplyDefaults(t *testing.T) { }, }, }, + "set basic defaults on static gameserver": { + gameServer: GameServer{ + Spec: GameServerSpec{ + PortPolicy: Static, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "testing", Image: "testing/image"}}}}}, + }, + container: "testing", + expected: expected{ + protocol: "UDP", + state: Creating, + policy: Static, + health: Health{ + Disabled: false, + FailureThreshold: 3, + InitialDelaySeconds: 5, + PeriodSeconds: 5, + }, + }, + }, "health is disabled": { gameServer: GameServer{ Spec: GameServerSpec{ @@ -125,7 +145,7 @@ func TestGameServerApplyDefaults(t *testing.T) { container: "testing", expected: expected{ protocol: "UDP", - state: Creating, + state: PortAllocation, policy: Dynamic, health: Health{ Disabled: true, @@ -148,6 +168,27 @@ func TestGameServerApplyDefaults(t *testing.T) { } } +func TestGameServerValidate(t *testing.T) { + gs := GameServer{ + Spec: GameServerSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "testing", Image: "testing/image"}}}}}, + } + gs.ApplyDefaults() + assert.Empty(t, gs.Validate()) + + gs = GameServer{ + Spec: GameServerSpec{ + Container: "nope", + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "testing", Image: "testing/image"}}}}}, + } + causes := gs.Validate() + assert.Len(t, causes, 1) + assert.Equal(t, causes[0].Field, "container") + assert.Equal(t, causes[0].Type, metav1.CauseTypeFieldValueInvalid) +} + func TestGameServerPod(t *testing.T) { fixture := &GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default", UID: "1234"}, Spec: GameServerSpec{ diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index 98c8e4be71..72861f00d7 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -15,6 +15,7 @@ package gameservers import ( + "encoding/json" "fmt" "net/http" "time" @@ -26,8 +27,11 @@ import ( "agones.dev/agones/pkg/client/informers/externalversions" listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1" "agones.dev/agones/pkg/util/runtime" + "agones.dev/agones/pkg/webhooks" + "github.com/mattbaird/jsonpatch" "github.com/pkg/errors" "github.com/sirupsen/logrus" + admv1beta1 "k8s.io/api/admission/v1beta1" corev1 "k8s.io/api/core/v1" apiv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" extclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" @@ -72,7 +76,9 @@ type Controller struct { } // NewController returns a new gameserver crd controller -func NewController(minPort, maxPort int32, +func NewController( + wh *webhooks.WebHook, + minPort, maxPort int32, sidecarImage string, alwaysPullSidecarImage bool, kubeClient kubernetes.Interface, @@ -105,6 +111,8 @@ func NewController(minPort, maxPort int32, recorder: recorder, } + wh.AddHandler("/mutate-gameserver", c.mutationHandler) + gsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueueGameServer, UpdateFunc: func(oldObj, newObj interface{}) { @@ -148,6 +156,73 @@ func NewController(minPort, maxPort int32, return c } +// mutationHandler is the handler for the mutating webhook that sets the +// the default values on the GameServer, and validates the results +func (c *Controller) mutationHandler(review admv1beta1.AdmissionReview) (admv1beta1.AdmissionReview, error) { + logrus.WithField("review", review).Info("mutationHandler") + + if !(review.Request.Operation == admv1beta1.Create && + review.Request.Kind.Group == stable.GroupName && + review.Request.Kind.Kind == "GameServer") { + + logrus.WithField("review", review).Warn("Skipping mutationHandler, because invalid operation") + return review, nil + } + + obj := review.Request.Object + gs := &stablev1alpha1.GameServer{} + err := json.Unmarshal(obj.Raw, gs) + if err != nil { + return review, errors.Wrapf(err, "error unmarshalling original GameServer json: %s", obj.Raw) + } + + // This is the main logic of this function + // the rest is really just json plumbing + gs.ApplyDefaults() + causes := gs.Validate() + if len(causes) > 0 { + review.Response.Allowed = false + details := metav1.StatusDetails{ + Name: review.Request.Name, + Group: review.Request.Kind.Group, + Kind: review.Request.Kind.Kind, + Causes: causes, + } + review.Response.Result = &metav1.Status{ + Status: metav1.StatusFailure, + Message: "GameServer configuration is invalid: " + details.String(), + Reason: metav1.StatusReasonInvalid, + Details: &details, + } + + logrus.WithField("review", review).Info("Invalid GameServer") + return review, nil + } + + new, err := json.Marshal(gs) + if err != nil { + return review, errors.Wrapf(err, "error marshalling default applied GameSever %s to json", gs.ObjectMeta.Name) + } + + patch, err := jsonpatch.CreatePatch(obj.Raw, new) + if err != nil { + return review, errors.Wrapf(err, "error creating patch for GameServer %s", gs.ObjectMeta.Name) + } + + json, err := json.Marshal(patch) + if err != nil { + return review, errors.Wrapf(err, "error creating json for patch for GameServer %s", gs.ObjectMeta.Name) + } + + logrus.WithField("gs", gs.ObjectMeta.Name).WithField("patch", string(json)).Infof("patch created!") + + pt := admv1beta1.PatchTypeJSONPatch + review.Response.PatchType = &pt + review.Response.Patch = json + + return review, nil +} + // Run the GameServer controller. Will block until stop is closed. // Runs threadiness number workers to process the rate limited queue func (c Controller) Run(threadiness int, stop <-chan struct{}) error { @@ -269,7 +344,7 @@ func (c *Controller) syncGameServer(key string) error { if gs, err = c.syncGameServerDeletionTimestamp(gs); err != nil { return err } - if gs, err = c.syncGameServerBlankState(gs); err != nil { + if gs, err = c.syncGameServerPortAllocationState(gs); err != nil { return err } if gs, err = c.syncGameServerCreatingState(gs); err != nil { @@ -326,33 +401,31 @@ func (c *Controller) syncGameServerDeletionTimestamp(gs *stablev1alpha1.GameServ return gs, errors.Wrapf(err, "error removing finalizer for GameServer %s", gsCopy.ObjectMeta.Name) } -// syncGameServerBlankState applies default values to the the GameServer if its state is "" (blank) -// returns an updated GameServer -func (c *Controller) syncGameServerBlankState(gs *stablev1alpha1.GameServer) (*stablev1alpha1.GameServer, error) { - if !(gs.Status.State == "" && gs.ObjectMeta.DeletionTimestamp.IsZero()) { +// syncGameServerPortAllocationState gives a port to a dynamically allocating GameServer +func (c *Controller) syncGameServerPortAllocationState(gs *stablev1alpha1.GameServer) (*stablev1alpha1.GameServer, error) { + if !(gs.Status.State == stablev1alpha1.PortAllocation && gs.ObjectMeta.DeletionTimestamp.IsZero()) { return gs, nil } gsCopy := gs.DeepCopy() - gsCopy.ApplyDefaults() - // manage dynamic ports - if gsCopy.Spec.PortPolicy == stablev1alpha1.Dynamic { - port, err := c.portAllocator.Allocate() - if err != nil { - return gsCopy, errors.Wrapf(err, "error allocating port for GameServer %s", gsCopy.Name) - } - gsCopy.Spec.HostPort = port + port, err := c.portAllocator.Allocate() + if err != nil { + return gsCopy, errors.Wrapf(err, "error allocating port for GameServer %s", gsCopy.Name) } + gsCopy.Spec.HostPort = port + gsCopy.Status.State = stablev1alpha1.Creating + c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Port allocated") - logrus.WithField("gs", gsCopy).Info("Syncing Blank State") - var err error + logrus.WithField("gs", gsCopy).Info("Syncing Port Allocation State") gs, err = c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(gsCopy) if err != nil { + // if the GameServer doesn't get updated with the port data, then put the port + // back in the pool, as it will get retried on the next pass + c.portAllocator.DeAllocate(port) return gs, errors.Wrapf(err, "error updating GameServer %s to default values", gs.Name) } - c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Defaults applied") return gs, nil } diff --git a/pkg/gameservers/controller_test.go b/pkg/gameservers/controller_test.go index 1c4f01a546..68ea03176e 100644 --- a/pkg/gameservers/controller_test.go +++ b/pkg/gameservers/controller_test.go @@ -20,11 +20,16 @@ import ( "sync" "testing" "time" + "encoding/json" "agones.dev/agones/pkg/apis/stable" "agones.dev/agones/pkg/apis/stable/v1alpha1" + "agones.dev/agones/pkg/webhooks" + "github.com/json-iterator/go" + "github.com/mattbaird/jsonpatch" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + admv1beta1 "k8s.io/api/admission/v1beta1" corev1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -86,9 +91,21 @@ func TestSyncGameServer(t *testing.T) { c, mocks := newFakeController() updateCount := 0 podCreated := false - fixture := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, - Spec: newSingleContainerSpec()} + fixture := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: v1alpha1.GameServerSpec{ + ContainerPort: 7777, + Template: corev1.PodTemplateSpec{Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "container", Image: "container/image"}}, + }, + }, + }, + } + + fixture.ApplyDefaults() + mocks.kubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, &corev1.NodeList{Items: []corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node1"}}}}, nil + }) mocks.kubeClient.AddReactor("create", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { ca := action.(k8stesting.CreateAction) pod := ca.GetObject().(*corev1.Pod) @@ -97,7 +114,7 @@ func TestSyncGameServer(t *testing.T) { return false, pod, nil }) mocks.agonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { - gameServers := &v1alpha1.GameServerList{Items: []v1alpha1.GameServer{fixture}} + gameServers := &v1alpha1.GameServerList{Items: []v1alpha1.GameServer{*fixture}} return true, gameServers, nil }) mocks.agonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { @@ -117,10 +134,12 @@ func TestSyncGameServer(t *testing.T) { return true, gs, nil }) - _, cancel := startInformers(mocks, c.gameServerSynced) + stop, cancel := startInformers(mocks, c.gameServerSynced) defer cancel() + err := c.portAllocator.Run(stop) + assert.Nil(t, err) - err := c.syncHandler("default/test") + err = c.syncHandler("default/test") assert.Nil(t, err) assert.Equal(t, 2, updateCount, "update reactor should twice") assert.True(t, podCreated, "pod should be created") @@ -229,6 +248,137 @@ func TestHealthCheck(t *testing.T) { testHTTPHealth(t, "http://localhost:8080/healthz", "ok", http.StatusOK) } +func TestMutationHandler(t *testing.T) { + t.Parallel() + + c, _ := newFakeController() + gvk := metav1.GroupVersionKind(v1alpha1.SchemeGroupVersion.WithKind("GameServer")) + + t.Run("not create", func(t *testing.T) { + review := admv1beta1.AdmissionReview{ + Request: &admv1beta1.AdmissionRequest{ + Kind: gvk, + Operation: admv1beta1.Update, + }, + } + + result, err := c.mutationHandler(review) + assert.Nil(t, err) + assert.Equal(t, review, result) + }) + + t.Run("not agones group", func(t *testing.T) { + nongroup := gvk.DeepCopy() + nongroup.Group = "broken" + + review := admv1beta1.AdmissionReview{ + Request: &admv1beta1.AdmissionRequest{ + Kind: *nongroup, + Operation: admv1beta1.Create, + }, + } + + result, err := c.mutationHandler(review) + assert.Nil(t, err) + assert.Equal(t, review, result) + }) + + t.Run("not gameserver kind", func(t *testing.T) { + nonkind := gvk.DeepCopy() + nonkind.Kind = "broken" + + review := admv1beta1.AdmissionReview{ + Request: &admv1beta1.AdmissionRequest{ + Kind: *nonkind, + Operation: admv1beta1.Create, + }, + } + + result, err := c.mutationHandler(review) + assert.Nil(t, err) + assert.Equal(t, review, result) + }) + + t.Run("gameserver defaults", func(t *testing.T) { + fixture := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: newSingleContainerSpec()} + + raw, err := jsoniter.Marshal(fixture) + assert.Nil(t, err) + review := admv1beta1.AdmissionReview{ + Request: &admv1beta1.AdmissionRequest{ + Kind: gvk, + Operation: admv1beta1.Create, + Object: runtime.RawExtension{ + Raw: raw, + }, + }, + Response: &admv1beta1.AdmissionResponse{Allowed: true}, + } + + result, err := c.mutationHandler(review) + assert.Nil(t, err) + assert.True(t, result.Response.Allowed) + assert.Equal(t, admv1beta1.PatchTypeJSONPatch, *result.Response.PatchType) + + patch := &jsonpatch.ByPath{} + err = json.Unmarshal(result.Response.Patch, patch) + assert.Nil(t, err) + + assertContains := func(patch *jsonpatch.ByPath, op jsonpatch.JsonPatchOperation) { + found := false + for _, p := range *patch { + if assert.ObjectsAreEqualValues(p, op) { + found = true + } + } + + assert.True(t, found, "Could not find operation %#v in patch %v", op, *patch) + } + + assertContains(patch, jsonpatch.JsonPatchOperation{Operation: "add", Path: "/metadata/finalizers", Value: []interface{}{"stable.agones.dev"}}) + assertContains(patch, jsonpatch.JsonPatchOperation{Operation: "add", Path: "/spec/protocol", Value: "UDP"}) + }) + + t.Run("invalid gameserver", func(t *testing.T) { + fixture := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: v1alpha1.GameServerSpec{ + Container: "NOPE!", + ContainerPort: 7777, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "container", Image: "container/image"}, + {Name: "container2", Image: "container/image"}, + }, + }, + }, + }, + } + raw, err := jsoniter.Marshal(fixture) + assert.Nil(t, err) + review := admv1beta1.AdmissionReview{ + Request: &admv1beta1.AdmissionRequest{ + Kind: gvk, + Operation: admv1beta1.Create, + Object: runtime.RawExtension{ + Raw: raw, + }, + }, + Response: &admv1beta1.AdmissionResponse{Allowed: true}, + } + + result, err := c.mutationHandler(review) + assert.Nil(t, err) + assert.False(t, result.Response.Allowed) + assert.Equal(t, metav1.StatusFailure, review.Response.Result.Status) + assert.Equal(t, metav1.StatusReasonInvalid, review.Response.Result.Reason) + assert.Equal(t, review.Request.Kind.Kind, result.Response.Result.Details.Kind) + assert.Equal(t, review.Request.Kind.Group, result.Response.Result.Details.Group) + assert.NotEmpty(t, result.Response.Result.Details.Causes) + }) +} + func TestSyncGameServerDeletionTimestamp(t *testing.T) { t.Parallel() @@ -293,33 +443,10 @@ func TestSyncGameServerDeletionTimestamp(t *testing.T) { }) } -func TestSyncGameServerBlankState(t *testing.T) { +func TestSyncGameServerPortAllocationState(t *testing.T) { t.Parallel() - t.Run("GameServer with a blank initial state", func(t *testing.T) { - c, mocks := newFakeController() - fixture := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, Spec: newSingleContainerSpec()} - updated := false - - mocks.agonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { - updated = true - ua := action.(k8stesting.UpdateAction) - gs := ua.GetObject().(*v1alpha1.GameServer) - assert.Equal(t, fixture.ObjectMeta.Name, gs.ObjectMeta.Name) - assert.Equal(t, fixture.ObjectMeta.Namespace, gs.ObjectMeta.Namespace) - return true, gs, nil - }) - - result, err := c.syncGameServerBlankState(fixture) - assert.Nil(t, err, "sync should not error") - assert.True(t, updated, "update should occur") - assert.Equal(t, fixture.ObjectMeta.Name, result.ObjectMeta.Name) - assert.Equal(t, fixture.ObjectMeta.Namespace, result.ObjectMeta.Namespace) - assert.Equal(t, v1alpha1.Creating, result.Status.State) - assert.Equal(t, fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, v1alpha1.Creating, "Defaults applied"), <-mocks.fakeRecorder.Events) - }) - - t.Run("Gameserver with dynamic port state", func(t *testing.T) { + t.Run("Gameserver with port allocation state", func(t *testing.T) { t.Parallel() c, mocks := newFakeController() fixture := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, @@ -331,7 +458,9 @@ func TestSyncGameServerBlankState(t *testing.T) { }, }, }, + Status: v1alpha1.GameServerStatus{State: v1alpha1.PortAllocation}, } + fixture.ApplyDefaults() mocks.kubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { return true, &corev1.NodeList{Items: []corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node1"}}}}, nil }) @@ -355,7 +484,7 @@ func TestSyncGameServerBlankState(t *testing.T) { err := c.portAllocator.Run(stop) assert.Nil(t, err) - result, err := c.syncGameServerBlankState(fixture) + result, err := c.syncGameServerPortAllocationState(fixture) assert.Nil(t, err, "sync should not error") assert.True(t, updated, "update should occur") assert.Equal(t, v1alpha1.Dynamic, result.Spec.PortPolicy) @@ -365,13 +494,13 @@ func TestSyncGameServerBlankState(t *testing.T) { t.Run("Gameserver with unknown state", func(t *testing.T) { testNoChange(t, "Unknown", func(c *Controller, fixture *v1alpha1.GameServer) (*v1alpha1.GameServer, error) { - return c.syncGameServerBlankState(fixture) + return c.syncGameServerPortAllocationState(fixture) }) }) t.Run("GameServer with non zero deletion datetime", func(t *testing.T) { testWithNonZeroDeletionTimestamp(t, v1alpha1.Shutdown, func(c *Controller, fixture *v1alpha1.GameServer) (*v1alpha1.GameServer, error) { - return c.syncGameServerBlankState(fixture) + return c.syncGameServerPortAllocationState(fixture) }) }) } @@ -778,7 +907,8 @@ func testWithNonZeroDeletionTimestamp(t *testing.T, state v1alpha1.State, f func // newFakeController returns a controller, backed by the fake Clientset func newFakeController() (*Controller, mocks) { m := newMocks() - c := NewController(10, 20, "sidecar:dev", false, + wh := webhooks.NewWebHook("", "") + c := NewController(wh, 10, 20, "sidecar:dev", false, m.kubeClient, m.kubeInformationFactory, m.extClient, m.agonesClient, m.agonesInformerFactory) c.recorder = m.fakeRecorder return c, m diff --git a/pkg/gameservers/portallocator.go b/pkg/gameservers/portallocator.go index 86dc261c13..4c79b3a812 100644 --- a/pkg/gameservers/portallocator.go +++ b/pkg/gameservers/portallocator.go @@ -136,6 +136,13 @@ func (pa *PortAllocator) Allocate() (int32, error) { return -1, ErrPortNotFound } +// DeAllocate marks the given port as no longer allocated +func (pa *PortAllocator) DeAllocate(port int32) { + pa.mutex.Lock() + defer pa.mutex.Unlock() + pa.portAllocations = setPortAllocation(port, pa.portAllocations, false) +} + // syncAddNode adds another node port section // to the available ports func (pa *PortAllocator) syncAddNode(obj interface{}) { @@ -156,12 +163,9 @@ func (pa *PortAllocator) syncAddNode(obj interface{}) { // syncDeleteGameServer when a GameServer Pod is deleted // make the HostPort available func (pa *PortAllocator) syncDeleteGameServer(object interface{}) { - pa.mutex.Lock() - defer pa.mutex.Unlock() - gs := object.(*v1alpha1.GameServer) logrus.WithField("gs", gs).Info("syncing deleted GameServer") - pa.portAllocations = setPortAllocation(gs.Spec.HostPort, pa.portAllocations, false) + pa.DeAllocate(gs.Spec.HostPort) } // syncPortAllocations syncs the pod, node and gameserver caches then diff --git a/pkg/gameservers/portallocator_test.go b/pkg/gameservers/portallocator_test.go index 86119e051b..cb1bb9031d 100644 --- a/pkg/gameservers/portallocator_test.go +++ b/pkg/gameservers/portallocator_test.go @@ -123,6 +123,30 @@ func TestPortAllocatorMultithreadAllocate(t *testing.T) { wg.Wait() } +func TestPortAllocatorDeAllocate(t *testing.T) { + t.Parallel() + + m := newMocks() + pa := NewPortAllocator(10, 20, m.kubeInformationFactory, m.agonesInformerFactory) + nodes := []corev1.Node{n1, n2, n3} + m.kubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { + nl := &corev1.NodeList{Items: nodes} + return true, nl, nil + }) + stop, cancel := startInformers(m) + defer cancel() + err := pa.Run(stop) + assert.Nil(t, err) + + port, err := pa.Allocate() + assert.Nil(t, err) + assert.True(t, port >= 10) + assert.Equal(t, 1, countAllocatedPorts(pa, port)) + + pa.DeAllocate(port) + assert.Equal(t, 0, countAllocatedPorts(pa, port)) +} + func TestPortAllocatorSyncPortAllocations(t *testing.T) { t.Parallel() diff --git a/pkg/webhooks/webhooks.go b/pkg/webhooks/webhooks.go new file mode 100644 index 0000000000..30b22525c3 --- /dev/null +++ b/pkg/webhooks/webhooks.go @@ -0,0 +1,121 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package webhooks manages and receives Kubernetes Webhooks +package webhooks + +import ( + "encoding/json" + "net/http" + + "agones.dev/agones/pkg/util/runtime" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "k8s.io/api/admission/v1beta1" +) + +// Handler handles a webhook's AdmissionReview coming in, and will return the +// AdmissionReview that will be the return value of the webhook +type Handler func(review v1beta1.AdmissionReview) (v1beta1.AdmissionReview, error) + +// Server is a http server interface to enable easier testing +type Server interface { + Close() error + ListenAndServeTLS(certFile, keyFile string) error +} + +// WebHook manage Kubernetes webhooks +type WebHook struct { + mux *http.ServeMux + server Server + certFile string + keyFile string + handlers map[string][]Handler +} + +// NewWebHook returns a Kubernetes webhook manager +func NewWebHook(certFile, keyFile string) *WebHook { + mux := http.NewServeMux() + server := http.Server{ + Addr: ":8081", + Handler: mux, + } + + return &WebHook{ + mux: mux, + server: &server, + certFile: certFile, + keyFile: keyFile, + handlers: map[string][]Handler{}, + } +} + +// Run runs the webhook server, starting a https listener. +// Will block on stop channel +func (wh *WebHook) Run(stop <-chan struct{}) error { + go func() { + <-stop + wh.server.Close() // nolint: errcheck + }() + + logrus.WithField("webook", wh).Infof("webhook: https server started") + + err := wh.server.ListenAndServeTLS(wh.certFile, wh.keyFile) + if err == http.ErrServerClosed { + logrus.WithError(err).Info("webhook: https server closed") + } + + return errors.Wrap(err, "Could not listen on :8081") +} + +// AddHandler adds a handler for a given path +func (wh *WebHook) AddHandler(path string, h Handler) { + if len(wh.handlers[path]) == 0 { + wh.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + err := wh.handle(path, w, r) + if err != nil { + runtime.HandleError(logrus.WithField("url", r.URL), err) + w.WriteHeader(http.StatusInternalServerError) + } + }) + } + wh.handlers[path] = append(wh.handlers[path], h) +} + +// handle Handles http requests for webhooks +func (wh *WebHook) handle(path string, w http.ResponseWriter, r *http.Request) error { + logrus.WithField("path", path).Info("running webhook") + var review v1beta1.AdmissionReview + err := json.NewDecoder(r.Body).Decode(&review) + if err != nil { + return errors.Wrapf(err, "error decoding decoding json for path %v", path) + } + + // set it to true, in case there are no handlers + if review.Response == nil { + review.Response = &v1beta1.AdmissionResponse{Allowed: true} + } + for _, h := range wh.handlers[path] { + review, err = h(review) + if err != nil { + return errors.Wrapf(err, "error with webhook handler for path %v", path) + } + } + err = json.NewEncoder(w).Encode(review) + if err != nil { + return errors.Wrapf(err, "error decoding encoding json for path %v", path) + } + + return nil +} diff --git a/pkg/webhooks/webhooks_test.go b/pkg/webhooks/webhooks_test.go new file mode 100644 index 0000000000..81636f3a9e --- /dev/null +++ b/pkg/webhooks/webhooks_test.go @@ -0,0 +1,98 @@ +// Copyright 2018 Google Inc. JSON Patch allows you to generate JSON that describes changes you want to make to a document, so you don't have to send the whole doc. Here's an example of the patch format:

```json
[
  { "op": "replace", "path": "/baz", "value": "boo" },
  { "op": "add", "path": "/hello", "value": ["world"] },
  { "op": "remove", "path": "/foo"}
]

```
The API is super simple
#example
```go
package main

import (
	"fmt"
	"github.com/mattbaird/jsonpatch"
)

var simpleA = `{"a":100, "b":200, "c":"hello"}`
var simpleB = `{"a":100, "b":200, "c":"goodbye"}`

func main() {
	patch, e := jsonpatch.CreatePatch([]byte(simpleA), []byte(simpleA))
	if e != nil {
		fmt.Printf("Error creating JSON patch:%v", e)
		return
	}
	for _, operation := range patch {
		fmt.Printf("%s\n", operation.Json())
	}
}
``` This code needs more tests, as it's a highly recursive, type-fiddly monster. Both are to be given as json encoded content. +// The function will return an array of JsonPatchOperations +// +// An error will be returned if any of the two documents are invalid. +func CreatePatch(a, b []byte) ([]JsonPatchOperation, error) { + aI := map[string]interface{}{} + bI := map[string]interface{}{} + err := json.Unmarshal(a, &aI) + if err != nil { + return nil, errBadJSONDoc + } + err = json.Unmarshal(b, &bI) + if err != nil { + return nil, errBadJSONDoc + } + return diff(aI, bI, "", []JsonPatchOperation{}) +} + +// Returns true if the values matches (must be json types) +// The types of the values must match, otherwise it will always return false +// If two map[string]interface{} are given, all elements must match. +func matchesValue(av, bv interface{}) bool { + if reflect.TypeOf(av) != reflect.TypeOf(bv) { + return false + } + switch at := av.(type) { + case string: + bt := bv.(string) + if bt == at { + return true + } + case float64: + bt := bv.(float64) + if bt == at { + return true + } + case bool: + bt := bv.(bool) + if bt == at { + return true + } + case map[string]interface{}: + bt := bv.(map[string]interface{}) + for key := range at { + if !matchesValue(at[key], bt[key]) { + return false + } + } + for key := range bt { + if !matchesValue(at[key], bt[key]) { + return false + } + } + return true + case []interface{}: + bt := bv.([]interface{}) + if len(bt) != len(at) { + return false + } + for key := range at { + if !matchesValue(at[key], bt[key]) { + return false + } + } + for key := range bt { + if !matchesValue(at[key], bt[key]) { + return false + } + } + return true + } + return false +} + +// From http://tools.ietf.org/html/rfc6901#section-4 : +// +// Evaluation of each reference token begins by decoding any escaped +// character sequence. This is performed by first transforming any +// occurrence of the sequence '~1' to '/', and then transforming any +// occurrence of the sequence '~0' to '~'. +// TODO decode support: +// var rfc6901Decoder = strings.NewReplacer("~1", "/", "~0", "~") + +var rfc6901Encoder = strings.NewReplacer("~", "~0", "/", "~1") + +func makePath(path string, newPart interface{}) string { + key := rfc6901Encoder.Replace(fmt.Sprintf("%v", newPart)) + if path == "" { + return "/" + key + } + if strings.HasSuffix(path, "/") { + return path + key + } + return path + "/" + key +} + +// diff returns the (recursive) difference between a and b as an array of JsonPatchOperations. +func diff(a, b map[string]interface{}, path string, patch []JsonPatchOperation) ([]JsonPatchOperation, error) { + for key, bv := range b { + p := makePath(path, key) + av, ok := a[key] + // value was added + if !ok { + patch = append(patch, NewPatch("add", p, bv)) + continue + } + // If types have changed, replace completely + if reflect.TypeOf(av) != reflect.TypeOf(bv) { + patch = append(patch, NewPatch("replace", p, bv)) + continue + } + // Types are the same, compare values + var err error + patch, err = handleValues(av, bv, p, patch) + if err != nil { + return nil, err + } + } + // Now add all deleted values as nil + for key := range a { + _, found := b[key] + if !found { + p := makePath(path, key) + + patch = append(patch, NewPatch("remove", p, nil)) + } + } + return patch, nil +} + +func handleValues(av, bv interface{}, p string, patch []JsonPatchOperation) ([]JsonPatchOperation, error) { + var err error + switch at := av.(type) { + case map[string]interface{}: + bt := bv.(map[string]interface{}) + patch, err = diff(at, bt, p, patch) + if err != nil { + return nil, err + } + case string, float64, bool: + if !matchesValue(av, bv) { + patch = append(patch, NewPatch("replace", p, bv)) + } + case []interface{}: + bt, ok := bv.([]interface{}) + if !ok { + // array replaced by non-array + patch = append(patch, NewPatch("replace", p, bv)) + } else if len(at) != len(bt) { + // arrays are not the same length + patch = append(patch, compareArray(at, bt, p)...) + + } else { + for i := range bt { + patch, err = handleValues(at[i], bt[i], makePath(p, i), patch) + if err != nil { + return nil, err + } + } + } + case nil: + switch bv.(type) { + case nil: + // Both nil, fine. + default: + patch = append(patch, NewPatch("add", p, bv)) + } + default: + panic(fmt.Sprintf("Unknown type:%T ", av)) + } + return patch, nil +} + +func compareArray(av, bv []interface{}, p string) []JsonPatchOperation { + retval := []JsonPatchOperation{} + // var err error + for i, v := range av { + found := false + for _, v2 := range bv { + if reflect.DeepEqual(v, v2) { + found = true + break + } + } + if !found { + retval = append(retval, NewPatch("remove", makePath(p, i), nil)) + } + } + + for i, v := range bv { + found := false + for _, v2 := range av { + if reflect.DeepEqual(v, v2) { + found = true + break + } + } + if 