Skip to content

Commit

Permalink
SDKServer Player Tracking implementation (googleforgames#1507)
Browse files Browse the repository at this point in the history
Implementation of the SDK gRPC Server, with accompanying unit tests.

Work on googleforgames#1033
  • Loading branch information
markmandel authored and ilkercelikyilmaz committed Oct 23, 2020
1 parent 2f04f44 commit 465326d
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 15 deletions.
145 changes: 130 additions & 15 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/clock"
Expand All @@ -53,10 +54,12 @@ import (
type Operation string

const (
updateState Operation = "updateState"
updateLabel Operation = "updateLabel"
updateAnnotation Operation = "updateAnnotation"
updatePlayerCapacity Operation = "updatePlayerCapacity"
updateState Operation = "updateState"
updateLabel Operation = "updateLabel"
updateAnnotation Operation = "updateAnnotation"
updatePlayerCapacity Operation = "updatePlayerCapacity"
updateConnectedPlayers Operation = "updateConnectedPlayers"
playerCountUpdatePeriod time.Duration = time.Second
)

var (
Expand Down Expand Up @@ -96,6 +99,7 @@ type SDKServer struct {
reserveTimer *time.Timer
gsReserveDuration *time.Duration
gsPlayerCapacity int64
gsConnectedPlayers []string
}

// NewSDKServer creates a SDKServer that sets up an
Expand Down Expand Up @@ -129,6 +133,7 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf
gsAnnotations: map[string]string{},
gsUpdateMutex: sync.RWMutex{},
gsWaitForSync: sync.WaitGroup{},
gsConnectedPlayers: []string{},
}

s.informerFactory = factory
Expand Down Expand Up @@ -230,11 +235,12 @@ func (s *SDKServer) Run(stop <-chan struct{}) error {
go wait.Until(s.runHealth, s.healthTimeout, stop)
}

// populate player capacity value
// populate player tracking values
if runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
s.gsUpdateMutex.Lock()
if gs.Status.Players != nil {
s.gsPlayerCapacity = gs.Status.Players.Capacity
s.gsConnectedPlayers = gs.Status.Players.IDs
}
s.gsUpdateMutex.Unlock()
}
Expand Down Expand Up @@ -272,6 +278,8 @@ func (s *SDKServer) syncGameServer(key string) error {
return s.updateAnnotations()
case updatePlayerCapacity:
return s.updatePlayerCapacity()
case updateConnectedPlayers:
return s.updateConnectedPlayers()
}

return errors.Errorf("could not sync game server key: %s", key)
Expand Down Expand Up @@ -547,43 +555,117 @@ func (s *SDKServer) stopReserveTimer() {
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) PlayerConnect(ctx context.Context, id *alpha.PlayerID) (*alpha.Bool, error) {
panic("implement me")
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.logger.WithField("PlayerId", id.PlayerID).Debug("Player Connected")

s.gsUpdateMutex.Lock()
defer s.gsUpdateMutex.Unlock()

// the player is already connected, return false.
for _, playerID := range s.gsConnectedPlayers {
if playerID == id.PlayerID {
return &alpha.Bool{Bool: false}, nil
}
}

if int64(len(s.gsConnectedPlayers)) >= s.gsPlayerCapacity {
return &alpha.Bool{}, errors.New("players are already at capacity")
}

// let's retain the original order, as it should be a smaller patch on data change
s.gsConnectedPlayers = append(s.gsConnectedPlayers, id.PlayerID)
s.workerqueue.EnqueueAfter(cache.ExplicitKey(string(updateConnectedPlayers)), playerCountUpdatePeriod)

return &alpha.Bool{Bool: true}, nil
}

// PlayerDisconnect should be called when a player disconnects.
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) PlayerDisconnect(ctx context.Context, id *alpha.PlayerID) (*alpha.Bool, error) {
panic("implement me")
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.logger.WithField("PlayerId", id.PlayerID).Debug("Player Disconnected")

s.gsUpdateMutex.Lock()
defer s.gsUpdateMutex.Unlock()

found := -1
for i, playerID := range s.gsConnectedPlayers {
if playerID == id.PlayerID {
found = i
break
}
}
if found == -1 {
return &alpha.Bool{Bool: false}, nil
}

// let's retain the original order, as it should be a smaller patch on data change
s.gsConnectedPlayers = append(s.gsConnectedPlayers[:found], s.gsConnectedPlayers[found+1:]...)
s.workerqueue.EnqueueAfter(cache.ExplicitKey(string(updateConnectedPlayers)), playerCountUpdatePeriod)

return &alpha.Bool{Bool: true}, nil
}

// IsPlayerConnected returns if the player ID is connected or not
// IsPlayerConnected returns if the playerID is currently connected to the GameServer.
// This is always accurate, even if the value hasn’t been updated to the GameServer status yet.
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) IsPlayerConnected(ctx context.Context, id *alpha.PlayerID) (*alpha.Bool, error) {
panic("implement me")
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.gsUpdateMutex.RLock()
defer s.gsUpdateMutex.RUnlock()

result := &alpha.Bool{Bool: false}

for _, playerID := range s.gsConnectedPlayers {
if playerID == id.PlayerID {
result.Bool = true
break
}
}

return result, nil
}

// GetConnectedPlayers returns if the players are connected or not
// GetConnectedPlayers returns the list of the currently connected player ids.
// This is always accurate, even if the value hasn’t been updated to the GameServer status yet.
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) GetConnectedPlayers(ctx context.Context, empty *alpha.Empty) (*alpha.PlayerIDList, error) {
panic("implement me")
func (s *SDKServer) GetConnectedPlayers(c context.Context, empty *alpha.Empty) (*alpha.PlayerIDList, error) {
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.gsUpdateMutex.RLock()
defer s.gsUpdateMutex.RUnlock()

return &alpha.PlayerIDList{List: s.gsConnectedPlayers}, nil
}

// GetPlayerCount returns the current player count.
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) GetPlayerCount(ctx context.Context, _ *alpha.Empty) (*alpha.Count, error) {
panic("implement me")
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.gsUpdateMutex.RLock()
defer s.gsUpdateMutex.RUnlock()
return &alpha.Count{Count: int64(len(s.gsConnectedPlayers))}, nil
}

// SetPlayerCapacity to change the game server's player capacity.
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) SetPlayerCapacity(ctx context.Context, count *alpha.Count) (*alpha.Empty, error) {
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.New(string(runtime.FeaturePlayerTracking) + " not enabled")
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.gsUpdateMutex.Lock()
s.gsPlayerCapacity = count.Count
Expand All @@ -598,7 +680,7 @@ func (s *SDKServer) SetPlayerCapacity(ctx context.Context, count *alpha.Count) (
// [FeatureFlag:PlayerTracking]
func (s *SDKServer) GetPlayerCapacity(ctx context.Context, _ *alpha.Empty) (*alpha.Count, error) {
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return nil, errors.New(string(runtime.FeaturePlayerTracking) + " not enabled")
return nil, errors.Errorf("%s not enabled", runtime.FeaturePlayerTracking)
}
s.gsUpdateMutex.RLock()
defer s.gsUpdateMutex.RUnlock()
Expand Down Expand Up @@ -694,3 +776,36 @@ func (s *SDKServer) updatePlayerCapacity() error {
s.recorder.Event(gs, corev1.EventTypeNormal, "PlayerCapacity", fmt.Sprintf("Set to %d", gs.Status.Players.Capacity))
return nil
}

// updateConnectedPlayers updates the Player IDs and Count fields in the GameServer's Status.
func (s *SDKServer) updateConnectedPlayers() error {
if !runtime.FeatureEnabled(runtime.FeaturePlayerTracking) {
return errors.New(string(runtime.FeaturePlayerTracking) + " not enabled")
}
gs, err := s.gameServer()
if err != nil {
return err
}

gsCopy := gs.DeepCopy()
same := false
s.gsUpdateMutex.RLock()
s.logger.WithField("playerIDs", s.gsConnectedPlayers).Debug("updating connected players")
same = apiequality.Semantic.DeepEqual(gsCopy.Status.Players.IDs, s.gsConnectedPlayers)
gsCopy.Status.Players.IDs = s.gsConnectedPlayers
gsCopy.Status.Players.Count = int64(len(s.gsConnectedPlayers))
s.gsUpdateMutex.RUnlock()
// if there is no change, then don't update
// since it's possible this could fire quite a lot, let's reduce the
// amount of requests as much as possible.
if same {
return nil
}

gs, err = s.gameServerGetter.GameServers(s.namespace).Update(gsCopy)
if err != nil {
return err
}
s.recorder.Event(gs, corev1.EventTypeNormal, "PlayerCount", fmt.Sprintf("Set to %d", gs.Status.Players.Count))
return nil
}
Loading

0 comments on commit 465326d

Please sign in to comment.