Skip to content

Commit

Permalink
sdkserver: fix race condition in setLabel, setAnnotation, and setState
Browse files Browse the repository at this point in the history
  • Loading branch information
Yingxin-Jiang committed Feb 13, 2019
1 parent 3952c1a commit 96a26dc
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 69 deletions.
124 changes: 63 additions & 61 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type SDKServer struct {
connectedStreams []sdk.SDK_WatchGameServerServer
stop <-chan struct{}
recorder record.EventRecorder
gsLabels map[string]string
gsAnnotations map[string]string
gsState stablev1alpha1.GameServerState
gsUpdateMutex sync.RWMutex
}

// NewSDKServer creates a SDKServer that sets up an
Expand Down Expand Up @@ -107,6 +111,9 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf
healthMutex: sync.RWMutex{},
healthFailureCount: 0,
streamMutex: sync.RWMutex{},
gsLabels: map[string]string{},
gsAnnotations: map[string]string{},
gsUpdateMutex: sync.RWMutex{},
}

s.informerFactory = factory
Expand Down Expand Up @@ -202,38 +209,30 @@ func (s *SDKServer) Run(stop <-chan struct{}) error {
return nil
}

// syncGameServer synchronises the GameServer with the
// requested operations
// takes a key in the format of {operation}/{data}
// syncGameServer synchronises the GameServer with the requested operations.
// The format of the key is {operation}. To prevent old operation data from
// overwriting the new one, the operation data is persisted in SDKServer.
func (s *SDKServer) syncGameServer(key string) error {
op := strings.Split(key, "/")
rest := op[1:]

switch Operation(op[0]) {
switch Operation(key) {
case updateState:
return s.syncState(rest)
return s.updateState()
case updateLabel:
return s.syncLabel(rest)
return s.updateLabels()
case updateAnnotation:
return s.syncAnnotation(rest)
return s.updateAnnotations()
}

return errors.Errorf("could not sync game server key: %s", key)
}

// syncState converts the string array into values for updateState
func (s *SDKServer) syncState(rest []string) error {
if len(rest) == 0 {
return errors.New("could not sync state, as not state provided")
// updateState sets the GameServer Status's state to the one persisted in SDKServer,
// i.e. SDKServer.gsState.
func (s *SDKServer) updateState() error {
s.logger.WithField("state", s.gsState).Info("Updating state")
if len(s.gsState) == 0 {
return errors.Errorf("could not update GameServer %s/%s to empty state", s.namespace, s.gameServerName)
}

return s.updateState(stablev1alpha1.GameServerState(rest[0]))
}

// updateState sets the GameServer Status's state to the state
// that has been passed through
func (s *SDKServer) updateState(state stablev1alpha1.GameServerState) error {
s.logger.WithField("state", state).Info("Updating state")
gameServers := s.gameServerGetter.GameServers(s.namespace)
gs, err := s.gameServer()
if err != nil {
Expand All @@ -246,74 +245,67 @@ func (s *SDKServer) updateState(state stablev1alpha1.GameServerState) error {
return nil
}

gs.Status.State = state
s.gsUpdateMutex.RLock()
gs.Status.State = s.gsState
s.gsUpdateMutex.RUnlock()
_, err = gameServers.Update(gs)

// state specific work here
if gs.Status.State == stablev1alpha1.GameServerStateUnhealthy {
s.recorder.Event(gs, corev1.EventTypeWarning, string(gs.Status.State), "No longer healthy")
}

return errors.Wrapf(err, "could not update GameServer %s/%s to state %s", s.namespace, s.gameServerName, state)
return errors.Wrapf(err, "could not update GameServer %s/%s to state %s", s.namespace, s.gameServerName, gs.Status.State)
}

func (s *SDKServer) gameServer() (*stablev1alpha1.GameServer, error) {
gs, err := s.gameServerLister.GameServers(s.namespace).Get(s.gameServerName)
return gs, errors.Wrapf(err, "could not retrieve GameServer %s/%s", s.namespace, s.gameServerName)
}

// syncLabel converts the string array values into values for
// updateLabel
func (s *SDKServer) syncLabel(rest []string) error {
if len(rest) < 2 {
return errors.Errorf("could not sync label: %#v", rest)
}

return s.updateLabel(rest[0], rest[1])
}

// updateLabel updates the label on this GameServer, with the prefix of
// "stable.agones.dev/sdk-"
func (s *SDKServer) updateLabel(key, value string) error {
s.logger.WithField("key", key).WithField("value", value).Info("updating label")
// updateLabels updates the labels on this GameServer to the ones persisted in SDKServer,
// i.e. SDKServer.gsLabels, with the prefix of "stable.agones.dev/sdk-"
func (s *SDKServer) updateLabels() error {
s.logger.WithField("labels", s.gsLabels).Info("updating label")
gs, err := s.gameServer()
if err != nil {
return err
}

gsCopy := gs.DeepCopy()
if gsCopy.ObjectMeta.Labels == nil {

s.gsUpdateMutex.RLock()
if len(s.gsLabels) > 0 && gsCopy.ObjectMeta.Labels == nil {
gsCopy.ObjectMeta.Labels = map[string]string{}
}
gsCopy.ObjectMeta.Labels[metadataPrefix+key] = value
for k, v := range s.gsLabels {
gsCopy.ObjectMeta.Labels[metadataPrefix+k] = v
}
s.gsUpdateMutex.RUnlock()

_, err = s.gameServerGetter.GameServers(s.namespace).Update(gsCopy)
return err
}

// syncAnnotation converts the string array values into values for
// updateAnnotation
func (s *SDKServer) syncAnnotation(rest []string) error {
if len(rest) < 2 {
return errors.Errorf("could not sync annotation: %#v", rest)
}

return s.updateAnnotation(rest[0], rest[1])
}

// updateAnnotation updates the Annotation on this GameServer, with the prefix of
// "stable.agones.dev/sdk-"
func (s *SDKServer) updateAnnotation(key, value string) error {
// updateAnnotations updates the Annotations on this GameServer to the ones persisted in SDKServer,
// i.e. SDKServer.gsAnnotations, with the prefix of "stable.agones.dev/sdk-"
func (s *SDKServer) updateAnnotations() error {
s.logger.WithField("annotations", s.gsAnnotations).Info("updating annotation")
gs, err := s.gameServer()
if err != nil {
return err
}

gsCopy := gs.DeepCopy()
if gsCopy.ObjectMeta.Annotations == nil {

s.gsUpdateMutex.RLock()
if len(s.gsAnnotations) > 0 && gsCopy.ObjectMeta.Annotations == nil {
gsCopy.ObjectMeta.Annotations = map[string]string{}
}
gsCopy.ObjectMeta.Annotations[metadataPrefix+key] = value
for k, v := range s.gsAnnotations {
gsCopy.ObjectMeta.Annotations[metadataPrefix+k] = v
}
s.gsUpdateMutex.RUnlock()

_, err = s.gameServerGetter.GameServers(s.namespace).Update(gsCopy)
return err
Expand All @@ -322,8 +314,10 @@ func (s *SDKServer) updateAnnotation(key, value string) error {
// enqueueState enqueue a State change request into the
// workerqueue
func (s *SDKServer) enqueueState(state stablev1alpha1.GameServerState) {
key := string(updateState) + "/" + string(state)
s.workerqueue.Enqueue(cache.ExplicitKey(key))
s.gsUpdateMutex.Lock()
s.gsState = state
s.gsUpdateMutex.Unlock()
s.workerqueue.Enqueue(cache.ExplicitKey(string(updateState)))
}

// Ready enters the RequestReady state change for this GameServer into
Expand Down Expand Up @@ -363,17 +357,25 @@ func (s *SDKServer) Health(stream sdk.SDK_HealthServer) error {
// metdata
func (s *SDKServer) SetLabel(_ context.Context, kv *sdk.KeyValue) (*sdk.Empty, error) {
s.logger.WithField("values", kv).Info("Adding SetLabel to queue")
key := string(updateLabel) + "/" + kv.Key + "/" + kv.Value
s.workerqueue.Enqueue(cache.ExplicitKey(key))

s.gsUpdateMutex.Lock()
s.gsLabels[kv.Key] = kv.Value
s.gsUpdateMutex.Unlock()

s.workerqueue.Enqueue(cache.ExplicitKey(string(updateLabel)))
return &sdk.Empty{}, nil
}

// SetAnnotation adds the Key/Value to be used to set the annotations with the metadataPrefix to the `GameServer`
// metdata
func (s *SDKServer) SetAnnotation(_ context.Context, kv *sdk.KeyValue) (*sdk.Empty, error) {
s.logger.WithField("values", kv).Info("Adding SetLabel to queue")
key := string(updateAnnotation) + "/" + kv.Key + "/" + kv.Value
s.workerqueue.Enqueue(cache.ExplicitKey(key))
s.logger.WithField("values", kv).Info("Adding SetAnnotation to queue")

s.gsUpdateMutex.Lock()
s.gsAnnotations[kv.Key] = kv.Value
s.gsUpdateMutex.Unlock()

s.workerqueue.Enqueue(cache.ExplicitKey(string(updateAnnotation)))
return &sdk.Empty{}, nil
}

Expand Down
46 changes: 38 additions & 8 deletions pkg/sdkserver/sdkserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,28 @@ func TestSidecarRun(t *testing.T) {
},
"label": {
f: func(sc *SDKServer, ctx context.Context) {
_, err := sc.SetLabel(ctx, &sdk.KeyValue{Key: "foo", Value: "bar"})
_, err := sc.SetLabel(ctx, &sdk.KeyValue{Key: "foo", Value: "value-foo"})
assert.Nil(t, err)
_, err = sc.SetLabel(ctx, &sdk.KeyValue{Key: "bar", Value: "value-bar"})
assert.Nil(t, err)
},
expected: expected{
labels: map[string]string{metadataPrefix + "foo": "bar"},
labels: map[string]string{
metadataPrefix + "foo": "value-foo",
metadataPrefix + "bar": "value-bar"},
},
},
"annotation": {
f: func(sc *SDKServer, ctx context.Context) {
_, err := sc.SetAnnotation(ctx, &sdk.KeyValue{Key: "test", Value: "annotation"})
_, err := sc.SetAnnotation(ctx, &sdk.KeyValue{Key: "test-1", Value: "annotation-1"})
assert.Nil(t, err)
_, err = sc.SetAnnotation(ctx, &sdk.KeyValue{Key: "test-2", Value: "annotation-2"})
assert.Nil(t, err)
},
expected: expected{
annotations: map[string]string{metadataPrefix + "test": "annotation"},
annotations: map[string]string{
metadataPrefix + "test-1": "annotation-1",
metadataPrefix + "test-2": "annotation-2"},
},
},
}
Expand Down Expand Up @@ -176,24 +184,40 @@ func TestSDKServerSyncGameServer(t *testing.T) {
annotations map[string]string
}

type scData struct {
gsState v1alpha1.GameServerState
gsLabels map[string]string
gsAnnotations map[string]string
}

fixtures := map[string]struct {
expected expected
key string
scData scData
}{
"ready": {
key: string(updateState) + "/" + string(v1alpha1.GameServerStateReady),
key: string(updateState),
scData: scData{
gsState: v1alpha1.GameServerStateReady,
},
expected: expected{
state: v1alpha1.GameServerStateReady,
},
},
"label": {
key: string(updateLabel) + "/foo/bar",
key: string(updateLabel),
scData: scData{
gsLabels: map[string]string{"foo": "bar"},
},
expected: expected{
labels: map[string]string{metadataPrefix + "foo": "bar"},
},
},
"annotation": {
key: string(updateAnnotation) + "/test/annotation",
key: string(updateAnnotation),
scData: scData{
gsAnnotations: map[string]string{"test": "annotation"},
},
expected: expected{
annotations: map[string]string{metadataPrefix + "test": "annotation"},
},
Expand All @@ -205,6 +229,11 @@ func TestSDKServerSyncGameServer(t *testing.T) {
m := agtesting.NewMocks()
sc, err := defaultSidecar(m)
assert.Nil(t, err)

sc.gsState = v.scData.gsState
sc.gsLabels = v.scData.gsLabels
sc.gsAnnotations = v.scData.gsAnnotations

updated := false

m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) {
Expand Down Expand Up @@ -253,6 +282,7 @@ func TestSidecarUpdateState(t *testing.T) {
m := agtesting.NewMocks()
sc, err := defaultSidecar(m)
assert.Nil(t, err)
sc.gsState = v1alpha1.GameServerStateReady

updated := false

Expand All @@ -275,7 +305,7 @@ func TestSidecarUpdateState(t *testing.T) {
sc.informerFactory.Start(stop)
assert.True(t, cache.WaitForCacheSync(stop, sc.gameServerSynced))

err = sc.updateState(v1alpha1.GameServerStateReady)
err = sc.updateState()
assert.Nil(t, err)
assert.False(t, updated)
})
Expand Down

0 comments on commit 96a26dc

Please sign in to comment.