Skip to content

Commit

Permalink
Implementing cross cluster allocation request (#757)
Browse files Browse the repository at this point in the history
  • Loading branch information
pooneh-m authored and markmandel committed May 7, 2019
1 parent e1f6807 commit 93f0c3a
Show file tree
Hide file tree
Showing 3 changed files with 439 additions and 40 deletions.
2 changes: 1 addition & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func main() {
faController := fleetallocation.NewController(wh, allocationMutex,
kubeClient, extClient, agonesClient, agonesInformerFactory)
gasController := gameserverallocations.NewController(api, health, gsCounter, topNGSForAllocation,
kubeClient, agonesClient, agonesInformerFactory)
kubeClient, kubeInformerFactory, agonesClient, agonesInformerFactory)
fasController := fleetautoscalers.NewController(wh, health,
kubeClient, extClient, agonesClient, agonesInformerFactory)

Expand Down
123 changes: 115 additions & 8 deletions pkg/gameserverallocations/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
package gameserverallocations

import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -48,9 +52,11 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
)
Expand All @@ -63,6 +69,12 @@ var (
ErrConflictInGameServerSelection = errors.New("The Gameserver was already allocated")
)

const (
secretClientCertName = "client-cert"
secretClientKeyName = "client-key"
secretCaCertName = "ca-cert"
)

// Controller is a the GameServerAllocation controller
type Controller struct {
baseLogger *logrus.Entry
Expand All @@ -75,6 +87,7 @@ type Controller struct {
gameServerGetter getterv1alpha1.GameServersGetter
gameServerLister listerv1alpha1.GameServerLister
allocationPolicyLister multiclusterlisterv1alpha1.GameServerAllocationPolicyLister
secretLister corev1lister.SecretLister
stop <-chan struct{}
workerqueue *workerqueue.WorkerQueue
recorder record.EventRecorder
Expand All @@ -98,6 +111,7 @@ func NewController(apiServer *apiserver.APIServer,
counter *gameservers.PerNodeCounter,
topNGameServerCnt int,
kubeClient kubernetes.Interface,
kubeInformerFactory informers.SharedInformerFactory,
agonesClient versioned.Interface,
agonesInformerFactory externalversions.SharedInformerFactory,
) *Controller {
Expand All @@ -110,6 +124,7 @@ func NewController(apiServer *apiserver.APIServer,
gameServerGetter: agonesClient.StableV1alpha1(),
gameServerLister: agonesInformer.GameServers().Lister(),
allocationPolicyLister: agonesInformerFactory.Multicluster().V1alpha1().GameServerAllocationPolicies().Lister(),
secretLister: kubeInformerFactory.Core().V1().Secrets().Lister(),
}
c.baseLogger = runtime.NewLoggerWithType(c)
c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServers, c.baseLogger, logfields.GameServerKey, stable.GroupName+".GameServerUpdateController")
Expand Down Expand Up @@ -290,19 +305,21 @@ func (c *Controller) allocateFromLocalCluster(gsa *v1alpha1.GameServerAllocation

// applyMultiClusterAllocation retrieves allocation policies and iterate on policies.
// Then allocate gameservers from local or remote cluster accordingly.
func (c *Controller) applyMultiClusterAllocation(gsa *v1alpha1.GameServerAllocation) (*v1alpha1.GameServerAllocation, error) {
var result *v1alpha1.GameServerAllocation
func (c *Controller) applyMultiClusterAllocation(gsa *v1alpha1.GameServerAllocation) (result *v1alpha1.GameServerAllocation, err error) {

selector, err := metav1.LabelSelectorAsSelector(&gsa.Spec.MultiClusterSetting.PolicySelector)
if err != nil {
return nil, err
selector := labels.Everything()
if len(gsa.Spec.MultiClusterSetting.PolicySelector.MatchLabels)+len(gsa.Spec.MultiClusterSetting.PolicySelector.MatchExpressions) != 0 {
selector, err = metav1.LabelSelectorAsSelector(&gsa.Spec.MultiClusterSetting.PolicySelector)
if err != nil {
return nil, err
}
}

policies, err := c.allocationPolicyLister.GameServerAllocationPolicies(gsa.ObjectMeta.Namespace).List(selector)
if err != nil {
return nil, err
} else if len(policies) == 0 {
return c.allocateFromLocalCluster(gsa)
return nil, errors.New("no multi-cluster allocation policy is specified")
}

it := multiclusterv1alpha1.NewConnectionInfoIterator(policies)
Expand All @@ -328,8 +345,98 @@ func (c *Controller) applyMultiClusterAllocation(gsa *v1alpha1.GameServerAllocat
// allocateFromRemoteCluster allocates gameservers from a remote cluster by making
// an http call to allocation service in that cluster.
func (c *Controller) allocateFromRemoteCluster(gsa v1alpha1.GameServerAllocation, connectionInfo *multiclusterv1alpha1.ClusterConnectionInfo, namespace string) (*v1alpha1.GameServerAllocation, error) {
// TODO: implement getting secrets and making rest call to remote cluster
return nil, nil
var gsaResult v1alpha1.GameServerAllocation

// TODO: handle converting error to apiserver error
// TODO: cache the client
client, err := c.createRemoteClusterRestClient(namespace, connectionInfo.SecretName)
if err != nil {
return nil, err
}

// Forward the game server allocation request to another cluster,
// and disable multicluster settings to avoid the target cluster
// forward the allocation request again.
gsa.Spec.MultiClusterSetting.Enabled = false
body, err := json.Marshal(gsa)
if err != nil {
return nil, err
}

// TODO: Retry on transient error --> response.StatusCode >= 500
response, err := client.Post(connectionInfo.AllocationEndpoint, "application/json", bytes.NewBuffer(body))
if err != nil {
return nil, err
}
defer response.Body.Close() // nolint: errcheck

data, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}
if response.StatusCode >= 400 {
return nil, errors.New(string(data))
}

err = json.Unmarshal(data, &gsaResult)
if err != nil {
return nil, err
}

return &gsaResult, nil
}

// createRemoteClusterRestClient creates a rest client with proper certs to make a remote call.
func (c *Controller) createRemoteClusterRestClient(namespace, secretName string) (*http.Client, error) {
clientCert, clientKey, caCert, err := c.getClientCertificates(namespace, secretName)
if err != nil {
return nil, err
}
if clientCert == nil || clientKey == nil {
return nil, fmt.Errorf("missing client certificate key pair in secret %s", secretName)
}

// Load client cert
cert, err := tls.X509KeyPair(clientCert, clientKey)
if err != nil {
return nil, err
}

tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert}}
if len(caCert) != 0 {
// Load CA cert, if provided and trust the server certificate.
// This is required for self-signed certs.
tlsConfig.RootCAs = x509.NewCertPool()
ca, err := x509.ParseCertificate(caCert)
if err != nil {
return nil, err
}
tlsConfig.RootCAs.AddCert(ca)
}

// Setup HTTPS client
return &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}, nil
}

// getClientCertificates returns the client certificates and CA cert for remote allocation cluster call
func (c *Controller) getClientCertificates(namespace, secretName string) (clientCert, clientKey, caCert []byte, err error) {
secret, err := c.secretLister.Secrets(namespace).Get(secretName)
if err != nil {
return nil, nil, nil, err
}
if secret == nil || len(secret.Data) == 0 {
return nil, nil, nil, fmt.Errorf("secert %s does not have data", secretName)
}

// Create http client using cert
clientCert = secret.Data[secretClientCertName]
clientKey = secret.Data[secretClientKeyName]
caCert = secret.Data[secretCaCertName]
return clientCert, clientKey, caCert, nil
}

// allocationDeserialization processes the request and namespace, and attempts to deserialise its values
Expand Down
Loading

0 comments on commit 93f0c3a

Please sign in to comment.