diff --git a/pkg/gameserverallocations/allocator.go b/pkg/gameserverallocations/allocator.go index 3b09cb63fb..cb66faca11 100644 --- a/pkg/gameserverallocations/allocator.go +++ b/pkg/gameserverallocations/allocator.go @@ -82,6 +82,13 @@ const ( batchWaitTime = 500 * time.Millisecond ) +const ( + // Timeout for an individual Allocate RPC call can take + remoteAllocateTimeout = 10 * time.Second + // Total timeout for allocation including retries + totalRemoteAllocationTimeout = 30 * time.Second +) + var allocationRetry = wait.Backoff{ Steps: 5, Duration: 10 * time.Millisecond, @@ -106,7 +113,7 @@ type Allocator struct { pendingRequests chan request readyGameServerCache *ReadyGameServerCache topNGameServerCount int - remoteAllocationCallback func(string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error) + remoteAllocationCallback func(context.Context, string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error) } // request is an async request for allocation @@ -133,15 +140,17 @@ func NewAllocator(policyInformer multiclusterinformerv1.GameServerAllocationPoli secretSynced: secretInformer.Informer().HasSynced, readyGameServerCache: readyGameServerCache, topNGameServerCount: topNGameServerDefaultCount, - remoteAllocationCallback: func(endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) { + remoteAllocationCallback: func(ctx context.Context, endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) { conn, err := grpc.Dial(endpoint, dialOpts) if err != nil { return nil, err } defer conn.Close() // nolint: errcheck + allocationCtx, cancel := context.WithTimeout(ctx, remoteAllocateTimeout) + defer cancel() // nolint: errcheck grpcClient := pb.NewAllocationServiceClient(conn) - return grpcClient.Allocate(context.Background(), request) + return grpcClient.Allocate(allocationCtx, request) }, } @@ -336,13 +345,14 @@ func (c *Allocator) allocateFromRemoteCluster(gsa *allocationv1.GameServerAlloca request.MultiClusterSetting.Enabled = false request.Namespace = connectionInfo.Namespace + ctx, _ := context.WithTimeout(context.Background(), totalRemoteAllocationTimeout) // Retry on remote call failures. err = Retry(remoteAllocationRetry, func() error { for i, ip := range connectionInfo.AllocationEndpoints { endpoint := addPort(ip) c.loggerForGameServerAllocationKey("remote-allocation").WithField("request", request).WithField("endpoint", endpoint).Debug("forwarding allocation request") - allocationResponse, err = c.remoteAllocationCallback(endpoint, dialOpts, request) + allocationResponse, err = c.remoteAllocationCallback(ctx, endpoint, dialOpts, request) if err != nil { c.baseLogger.Errorf("remote allocation failed with: %v", err) // If there are multiple enpoints for the allocator connection and the current one is diff --git a/pkg/gameserverallocations/controller_test.go b/pkg/gameserverallocations/controller_test.go index d801ad5569..75962bfec6 100644 --- a/pkg/gameserverallocations/controller_test.go +++ b/pkg/gameserverallocations/controller_test.go @@ -16,6 +16,7 @@ package gameserverallocations import ( "bytes" + "context" "encoding/json" "fmt" "net/http" @@ -968,7 +969,7 @@ func TestMultiClusterAllocationFromRemote(t *testing.T) { }, } - c.allocator.remoteAllocationCallback = func(e string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) { + c.allocator.remoteAllocationCallback = func(ctx context.Context, e string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) { assert.Equal(t, endpoint+":443", e) serverResponse := pb.AllocationResponse{ GameServerName: expectedGSName, @@ -991,7 +992,7 @@ func TestMultiClusterAllocationFromRemote(t *testing.T) { retry := 0 endpoint := "z.z.z.z" - c.allocator.remoteAllocationCallback = func(endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) { + c.allocator.remoteAllocationCallback = func(ctx context.Context, endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) { if count == 0 { serverResponse := pb.AllocationResponse{} count++ @@ -1087,7 +1088,7 @@ func TestMultiClusterAllocationFromRemote(t *testing.T) { healthyEndpoint := "healthy_endpoint:443" expectedGSName := "mocked" - c.allocator.remoteAllocationCallback = func(endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) { + c.allocator.remoteAllocationCallback = func(ctx context.Context, endpoint string, dialOpt grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) { if endpoint == unhealthyEndpoint { return nil, errors.New("test error message") }