Skip to content

Commit

Permalink
adding timeout to allocate call and adding total timeout to allocate
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmytro Kislov committed Sep 15, 2020
1 parent f31221c commit da989a1
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
19 changes: 15 additions & 4 deletions pkg/gameserverallocations/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ const (
batchWaitTime = 500 * time.Millisecond
)

const (
// Timeout for an individual Allocate RPC call can take
remoteAllocateTimeout = 10 * time.Second
// Total timeout for allocation including retries
totalRemoteAllocationTimeout = 30 * time.Second
)

var allocationRetry = wait.Backoff{
Steps: 5,
Duration: 10 * time.Millisecond,
Expand All @@ -106,7 +113,7 @@ type Allocator struct {
pendingRequests chan request
readyGameServerCache *ReadyGameServerCache
topNGameServerCount int
remoteAllocationCallback func(string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error)
remoteAllocationCallback func(context.Context, string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error)
}

// request is an async request for allocation
Expand All @@ -133,15 +140,17 @@ func NewAllocator(policyInformer multiclusterinformerv1.GameServerAllocationPoli
secretSynced: secretInformer.Informer().HasSynced,
readyGameServerCache: readyGameServerCache,
topNGameServerCount: topNGameServerDefaultCount,
remoteAllocationCallback: func(endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
remoteAllocationCallback: func(ctx context.Context, endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
conn, err := grpc.Dial(endpoint, dialOpts)
if err != nil {
return nil, err
}
defer conn.Close() // nolint: errcheck

allocationCtx, cancel := context.WithTimeout(ctx, remoteAllocateTimeout)
defer cancel() // nolint: errcheck
grpcClient := pb.NewAllocationServiceClient(conn)
return grpcClient.Allocate(context.Background(), request)
return grpcClient.Allocate(allocationCtx, request)
},
}

Expand Down Expand Up @@ -336,13 +345,15 @@ func (c *Allocator) allocateFromRemoteCluster(gsa *allocationv1.GameServerAlloca
request.MultiClusterSetting.Enabled = false
request.Namespace = connectionInfo.Namespace

ctx, cancel := context.WithTimeout(context.Background(), totalRemoteAllocationTimeout)
defer cancel() // nolint: errcheck
// Retry on remote call failures.
err = Retry(remoteAllocationRetry, func() error {
for i, ip := range connectionInfo.AllocationEndpoints {
endpoint := addPort(ip)
c.loggerForGameServerAllocationKey("remote-allocation").WithField("request", request).WithField("endpoint", endpoint).Debug("forwarding allocation request")

allocationResponse, err = c.remoteAllocationCallback(endpoint, dialOpts, request)
allocationResponse, err = c.remoteAllocationCallback(ctx, endpoint, dialOpts, request)
if err != nil {
c.baseLogger.Errorf("remote allocation failed with: %v", err)
// If there are multiple enpoints for the allocator connection and the current one is
Expand Down
7 changes: 4 additions & 3 deletions pkg/gameserverallocations/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gameserverallocations

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -968,7 +969,7 @@ func TestMultiClusterAllocationFromRemote(t *testing.T) {
},
}

c.allocator.remoteAllocationCallback = func(e string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
c.allocator.remoteAllocationCallback = func(ctx context.Context, e string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
assert.Equal(t, endpoint+":443", e)
serverResponse := pb.AllocationResponse{
GameServerName: expectedGSName,
Expand All @@ -991,7 +992,7 @@ func TestMultiClusterAllocationFromRemote(t *testing.T) {
retry := 0
endpoint := "z.z.z.z"

c.allocator.remoteAllocationCallback = func(endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
c.allocator.remoteAllocationCallback = func(ctx context.Context, endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
if count == 0 {
serverResponse := pb.AllocationResponse{}
count++
Expand Down Expand Up @@ -1087,7 +1088,7 @@ func TestMultiClusterAllocationFromRemote(t *testing.T) {
healthyEndpoint := "healthy_endpoint:443"

expectedGSName := "mocked"
c.allocator.remoteAllocationCallback = func(endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
c.allocator.remoteAllocationCallback = func(ctx context.Context, endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
if endpoint == unhealthyEndpoint {
return nil, errors.New("test error message")
}
Expand Down

0 comments on commit da989a1

Please sign in to comment.