Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdkserver: fix race condition in SDK.SetLabel and SDK.SetAnnotation (issue #455) #588

Merged
merged 1 commit into from
Feb 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 63 additions & 61 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type SDKServer struct {
connectedStreams []sdk.SDK_WatchGameServerServer
stop <-chan struct{}
recorder record.EventRecorder
gsLabels map[string]string
gsAnnotations map[string]string
gsState stablev1alpha1.GameServerState
gsUpdateMutex sync.RWMutex
}

// NewSDKServer creates a SDKServer that sets up an
Expand Down Expand Up @@ -107,6 +111,9 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf
healthMutex: sync.RWMutex{},
healthFailureCount: 0,
streamMutex: sync.RWMutex{},
gsLabels: map[string]string{},
gsAnnotations: map[string]string{},
gsUpdateMutex: sync.RWMutex{},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no need to initialize mutex like that (default value is fine). feel free to remove all of those.

}

s.informerFactory = factory
Expand Down Expand Up @@ -202,38 +209,30 @@ func (s *SDKServer) Run(stop <-chan struct{}) error {
return nil
}

// syncGameServer synchronises the GameServer with the
// requested operations
// takes a key in the format of {operation}/{data}
// syncGameServer synchronises the GameServer with the requested operations.
// The format of the key is {operation}. To prevent old operation data from
// overwriting the new one, the operation data is persisted in SDKServer.
func (s *SDKServer) syncGameServer(key string) error {
op := strings.Split(key, "/")
rest := op[1:]

switch Operation(op[0]) {
switch Operation(key) {
case updateState:
return s.syncState(rest)
return s.updateState()
case updateLabel:
return s.syncLabel(rest)
return s.updateLabels()
case updateAnnotation:
return s.syncAnnotation(rest)
return s.updateAnnotations()
}

return errors.Errorf("could not sync game server key: %s", key)
}

// syncState converts the string array into values for updateState
func (s *SDKServer) syncState(rest []string) error {
if len(rest) == 0 {
return errors.New("could not sync state, as not state provided")
// updateState sets the GameServer Status's state to the one persisted in SDKServer,
// i.e. SDKServer.gsState.
func (s *SDKServer) updateState() error {
s.logger.WithField("state", s.gsState).Info("Updating state")
if len(s.gsState) == 0 {
return errors.Errorf("could not update GameServer %s/%s to empty state", s.namespace, s.gameServerName)
}

return s.updateState(stablev1alpha1.GameServerState(rest[0]))
}

// updateState sets the GameServer Status's state to the state
// that has been passed through
func (s *SDKServer) updateState(state stablev1alpha1.GameServerState) error {
s.logger.WithField("state", state).Info("Updating state")
gameServers := s.gameServerGetter.GameServers(s.namespace)
gs, err := s.gameServer()
if err != nil {
Expand All @@ -246,74 +245,67 @@ func (s *SDKServer) updateState(state stablev1alpha1.GameServerState) error {
return nil
}

gs.Status.State = state
s.gsUpdateMutex.RLock()
gs.Status.State = s.gsState
s.gsUpdateMutex.RUnlock()
_, err = gameServers.Update(gs)

// state specific work here
if gs.Status.State == stablev1alpha1.GameServerStateUnhealthy {
s.recorder.Event(gs, corev1.EventTypeWarning, string(gs.Status.State), "No longer healthy")
}

return errors.Wrapf(err, "could not update GameServer %s/%s to state %s", s.namespace, s.gameServerName, state)
return errors.Wrapf(err, "could not update GameServer %s/%s to state %s", s.namespace, s.gameServerName, gs.Status.State)
}

func (s *SDKServer) gameServer() (*stablev1alpha1.GameServer, error) {
gs, err := s.gameServerLister.GameServers(s.namespace).Get(s.gameServerName)
return gs, errors.Wrapf(err, "could not retrieve GameServer %s/%s", s.namespace, s.gameServerName)
}

// syncLabel converts the string array values into values for
// updateLabel
func (s *SDKServer) syncLabel(rest []string) error {
if len(rest) < 2 {
return errors.Errorf("could not sync label: %#v", rest)
}

return s.updateLabel(rest[0], rest[1])
}

// updateLabel updates the label on this GameServer, with the prefix of
// "stable.agones.dev/sdk-"
func (s *SDKServer) updateLabel(key, value string) error {
s.logger.WithField("key", key).WithField("value", value).Info("updating label")
// updateLabels updates the labels on this GameServer to the ones persisted in SDKServer,
// i.e. SDKServer.gsLabels, with the prefix of "stable.agones.dev/sdk-"
func (s *SDKServer) updateLabels() error {
s.logger.WithField("labels", s.gsLabels).Info("updating label")
gs, err := s.gameServer()
if err != nil {
return err
}

gsCopy := gs.DeepCopy()
if gsCopy.ObjectMeta.Labels == nil {

s.gsUpdateMutex.RLock()
if len(s.gsLabels) > 0 && gsCopy.ObjectMeta.Labels == nil {
gsCopy.ObjectMeta.Labels = map[string]string{}
}
gsCopy.ObjectMeta.Labels[metadataPrefix+key] = value
for k, v := range s.gsLabels {
gsCopy.ObjectMeta.Labels[metadataPrefix+k] = v
}
s.gsUpdateMutex.RUnlock()

_, err = s.gameServerGetter.GameServers(s.namespace).Update(gsCopy)
return err
}

// syncAnnotation converts the string array values into values for
// updateAnnotation
func (s *SDKServer) syncAnnotation(rest []string) error {
if len(rest) < 2 {
return errors.Errorf("could not sync annotation: %#v", rest)
}

return s.updateAnnotation(rest[0], rest[1])
}

// updateAnnotation updates the Annotation on this GameServer, with the prefix of
// "stable.agones.dev/sdk-"
func (s *SDKServer) updateAnnotation(key, value string) error {
// updateAnnotations updates the Annotations on this GameServer to the ones persisted in SDKServer,
// i.e. SDKServer.gsAnnotations, with the prefix of "stable.agones.dev/sdk-"
func (s *SDKServer) updateAnnotations() error {
s.logger.WithField("annotations", s.gsAnnotations).Info("updating annotation")
gs, err := s.gameServer()
if err != nil {
return err
}

gsCopy := gs.DeepCopy()
if gsCopy.ObjectMeta.Annotations == nil {

s.gsUpdateMutex.RLock()
if len(s.gsAnnotations) > 0 && gsCopy.ObjectMeta.Annotations == nil {
gsCopy.ObjectMeta.Annotations = map[string]string{}
}
gsCopy.ObjectMeta.Annotations[metadataPrefix+key] = value
for k, v := range s.gsAnnotations {
gsCopy.ObjectMeta.Annotations[metadataPrefix+k] = v
}
s.gsUpdateMutex.RUnlock()

_, err = s.gameServerGetter.GameServers(s.namespace).Update(gsCopy)
return err
Expand All @@ -322,8 +314,10 @@ func (s *SDKServer) updateAnnotation(key, value string) error {
// enqueueState enqueue a State change request into the
// workerqueue
func (s *SDKServer) enqueueState(state stablev1alpha1.GameServerState) {
key := string(updateState) + "/" + string(state)
s.workerqueue.Enqueue(cache.ExplicitKey(key))
s.gsUpdateMutex.Lock()
s.gsState = state
s.gsUpdateMutex.Unlock()
s.workerqueue.Enqueue(cache.ExplicitKey(string(updateState)))
}

// Ready enters the RequestReady state change for this GameServer into
Expand Down Expand Up @@ -363,17 +357,25 @@ func (s *SDKServer) Health(stream sdk.SDK_HealthServer) error {
// metdata
func (s *SDKServer) SetLabel(_ context.Context, kv *sdk.KeyValue) (*sdk.Empty, error) {
s.logger.WithField("values", kv).Info("Adding SetLabel to queue")
key := string(updateLabel) + "/" + kv.Key + "/" + kv.Value
s.workerqueue.Enqueue(cache.ExplicitKey(key))

s.gsUpdateMutex.Lock()
s.gsLabels[kv.Key] = kv.Value
s.gsUpdateMutex.Unlock()

s.workerqueue.Enqueue(cache.ExplicitKey(string(updateLabel)))
return &sdk.Empty{}, nil
}

// SetAnnotation adds the Key/Value to be used to set the annotations with the metadataPrefix to the `GameServer`
// metdata
func (s *SDKServer) SetAnnotation(_ context.Context, kv *sdk.KeyValue) (*sdk.Empty, error) {
s.logger.WithField("values", kv).Info("Adding SetLabel to queue")
key := string(updateAnnotation) + "/" + kv.Key + "/" + kv.Value
s.workerqueue.Enqueue(cache.ExplicitKey(key))
s.logger.WithField("values", kv).Info("Adding SetAnnotation to queue")

s.gsUpdateMutex.Lock()
s.gsAnnotations[kv.Key] = kv.Value
s.gsUpdateMutex.Unlock()

s.workerqueue.Enqueue(cache.ExplicitKey(string(updateAnnotation)))
return &sdk.Empty{}, nil
}

Expand Down
46 changes: 38 additions & 8 deletions pkg/sdkserver/sdkserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,28 @@ func TestSidecarRun(t *testing.T) {
},
"label": {
f: func(sc *SDKServer, ctx context.Context) {
_, err := sc.SetLabel(ctx, &sdk.KeyValue{Key: "foo", Value: "bar"})
_, err := sc.SetLabel(ctx, &sdk.KeyValue{Key: "foo", Value: "value-foo"})
assert.Nil(t, err)
_, err = sc.SetLabel(ctx, &sdk.KeyValue{Key: "bar", Value: "value-bar"})
assert.Nil(t, err)
},
expected: expected{
labels: map[string]string{metadataPrefix + "foo": "bar"},
labels: map[string]string{
metadataPrefix + "foo": "value-foo",
metadataPrefix + "bar": "value-bar"},
},
},
"annotation": {
f: func(sc *SDKServer, ctx context.Context) {
_, err := sc.SetAnnotation(ctx, &sdk.KeyValue{Key: "test", Value: "annotation"})
_, err := sc.SetAnnotation(ctx, &sdk.KeyValue{Key: "test-1", Value: "annotation-1"})
assert.Nil(t, err)
_, err = sc.SetAnnotation(ctx, &sdk.KeyValue{Key: "test-2", Value: "annotation-2"})
assert.Nil(t, err)
},
expected: expected{
annotations: map[string]string{metadataPrefix + "test": "annotation"},
annotations: map[string]string{
metadataPrefix + "test-1": "annotation-1",
metadataPrefix + "test-2": "annotation-2"},
},
},
}
Expand Down Expand Up @@ -176,24 +184,40 @@ func TestSDKServerSyncGameServer(t *testing.T) {
annotations map[string]string
}

type scData struct {
gsState v1alpha1.GameServerState
gsLabels map[string]string
gsAnnotations map[string]string
}

fixtures := map[string]struct {
expected expected
key string
scData scData
}{
"ready": {
key: string(updateState) + "/" + string(v1alpha1.GameServerStateReady),
key: string(updateState),
scData: scData{
gsState: v1alpha1.GameServerStateReady,
},
expected: expected{
state: v1alpha1.GameServerStateReady,
},
},
"label": {
key: string(updateLabel) + "/foo/bar",
key: string(updateLabel),
scData: scData{
gsLabels: map[string]string{"foo": "bar"},
},
expected: expected{
labels: map[string]string{metadataPrefix + "foo": "bar"},
},
},
"annotation": {
key: string(updateAnnotation) + "/test/annotation",
key: string(updateAnnotation),
scData: scData{
gsAnnotations: map[string]string{"test": "annotation"},
},
expected: expected{
annotations: map[string]string{metadataPrefix + "test": "annotation"},
},
Expand All @@ -205,6 +229,11 @@ func TestSDKServerSyncGameServer(t *testing.T) {
m := agtesting.NewMocks()
sc, err := defaultSidecar(m)
assert.Nil(t, err)

sc.gsState = v.scData.gsState
sc.gsLabels = v.scData.gsLabels
sc.gsAnnotations = v.scData.gsAnnotations

updated := false

m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) {
Expand Down Expand Up @@ -253,6 +282,7 @@ func TestSidecarUpdateState(t *testing.T) {
m := agtesting.NewMocks()
sc, err := defaultSidecar(m)
assert.Nil(t, err)
sc.gsState = v1alpha1.GameServerStateReady

updated := false

Expand All @@ -275,7 +305,7 @@ func TestSidecarUpdateState(t *testing.T) {
sc.informerFactory.Start(stop)
assert.True(t, cache.WaitForCacheSync(stop, sc.gameServerSynced))

err = sc.updateState(v1alpha1.GameServerStateReady)
err = sc.updateState()
assert.Nil(t, err)
assert.False(t, updated)
})
Expand Down