Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: AWS websocket support #459

Merged
merged 4 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cloud/aws/deploy/policy/iam.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/nitrictech/nitric/cloud/aws/deploy/queue"
"github.com/nitrictech/nitric/cloud/aws/deploy/secret"
"github.com/nitrictech/nitric/cloud/aws/deploy/topic"
"github.com/nitrictech/nitric/cloud/aws/deploy/websocket"
deploy "github.com/nitrictech/nitric/core/pkg/api/nitric/deploy/v1"
v1 "github.com/nitrictech/nitric/core/pkg/api/nitric/v1"
)
Expand All @@ -54,6 +55,7 @@ type StackResources struct {
Buckets map[string]*bucket.S3Bucket
Collections map[string]*collection.DynamodbCollection
Secrets map[string]*secret.SecretsManagerSecret
Websockets map[string]*websocket.AwsWebsocketApiGateway
}

type PrincipalMap = map[v1.ResourceType]map[string]*iam.Role
Expand Down Expand Up @@ -132,6 +134,9 @@ var awsActionsMap map[v1.Action][]string = map[v1.Action][]string{
v1.Action_SecretPut: {
"secretsmanager:PutSecretValue",
},
v1.Action_WebsocketManage: {
"execute-api:ManageConnections",
},
}

func actionsToAwsActions(actions []v1.Action) []string {
Expand Down Expand Up @@ -167,6 +172,10 @@ func arnForResource(resource *deploy.Resource, resources *StackResources) ([]int
if s, ok := resources.Secrets[resource.Name]; ok {
return []interface{}{s.SecretsManager.Arn}, nil
}
case v1.ResourceType_Websocket:
if w, ok := resources.Websockets[resource.Name]; ok {
return []interface{}{pulumi.Sprintf("%s/*", w.Api.ExecutionArn)}, nil
}
default:
return nil, fmt.Errorf(
"invalid resource type: %s. Did you mean to define it as a principal?", resource.Type)
Expand Down
19 changes: 19 additions & 0 deletions cloud/aws/deploy/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/nitrictech/nitric/cloud/aws/deploy/secret"
"github.com/nitrictech/nitric/cloud/aws/deploy/stack"
"github.com/nitrictech/nitric/cloud/aws/deploy/topic"
"github.com/nitrictech/nitric/cloud/aws/deploy/websocket"
commonDeploy "github.com/nitrictech/nitric/cloud/common/deploy"
"github.com/nitrictech/nitric/cloud/common/deploy/image"
pulumiutils "github.com/nitrictech/nitric/cloud/common/deploy/pulumi"
Expand Down Expand Up @@ -309,6 +310,23 @@ func (d *DeployServer) Up(request *deploy.DeployUpRequest, stream deploy.DeployS
}
}

// deploy websockets
websockets := map[string]*websocket.AwsWebsocketApiGateway{}
for _, res := range request.Spec.Resources {
switch ws := res.Config.(type) {
case *deploy.Resource_Websocket:
websockets[res.Name], err = websocket.NewAwsWebsocketApiGateway(ctx, res.Name, &websocket.AwsWebsocketApiGatewayArgs{
DefaultTarget: execs[ws.Websocket.MessageTarget.GetExecutionUnit()],
ConnectTarget: execs[ws.Websocket.ConnectTarget.GetExecutionUnit()],
DisconnectTarget: execs[ws.Websocket.DisconnectTarget.GetExecutionUnit()],
StackID: stackID,
})
if err != nil {
return err
}
}
}

// Deploy all schedules
schedules := map[string]*schedule.AwsEventbridgeSchedule{}
for _, res := range request.Spec.Resources {
Expand Down Expand Up @@ -381,6 +399,7 @@ func (d *DeployServer) Up(request *deploy.DeployUpRequest, stream deploy.DeployS
Queues: queues,
Collections: collections,
Secrets: secrets,
Websockets: websockets,
},
Principals: principals,
})
Expand Down
184 changes: 184 additions & 0 deletions cloud/aws/deploy/websocket/apigateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright Nitric Pty Ltd.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package websocket

import (
"fmt"

"github.com/pulumi/pulumi-aws/sdk/v5/go/aws/apigatewayv2"
awslambda "github.com/pulumi/pulumi-aws/sdk/v5/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"

"github.com/nitrictech/nitric/cloud/aws/deploy/exec"
common "github.com/nitrictech/nitric/cloud/common/deploy/tags"
)

type AwsWebsocketApiGatewayArgs struct {
DefaultTarget *exec.LambdaExecUnit
ConnectTarget *exec.LambdaExecUnit
DisconnectTarget *exec.LambdaExecUnit

StackID pulumi.StringInput
}

type AwsWebsocketApiGateway struct {
pulumi.ResourceState

Name string
Api *apigatewayv2.Api
}

func NewAwsWebsocketApiGateway(ctx *pulumi.Context, name string, args *AwsWebsocketApiGatewayArgs, opts ...pulumi.ResourceOption) (*AwsWebsocketApiGateway, error) {
res := &AwsWebsocketApiGateway{Name: name}

err := ctx.RegisterComponentResource("nitric:websocket:AwsApiGateway", name, res, opts...)
if err != nil {
return nil, err
}

opts = append(opts, pulumi.Parent(res))

res.Api, err = apigatewayv2.NewApi(ctx, name, &apigatewayv2.ApiArgs{
ProtocolType: pulumi.String("WEBSOCKET"),
Tags: common.Tags(ctx, args.StackID, name),
// TODO: We won't actually be using this, but it is required.
// Instead we'll be using the $default route
RouteSelectionExpression: pulumi.String("$request.body.action"),
}, opts...)
if err != nil {
return nil, err
}

// Create the API integrations
integrationDefault, err := apigatewayv2.NewIntegration(ctx, fmt.Sprintf("%s-default-integration", name), &apigatewayv2.IntegrationArgs{
ApiId: res.Api.ID(),
IntegrationType: pulumi.String("AWS_PROXY"),
IntegrationUri: args.DefaultTarget.Function.Arn,
})
if err != nil {
return nil, err
}

_, err = awslambda.NewPermission(ctx, fmt.Sprintf("%s-default-permission", name), &awslambda.PermissionArgs{
Function: args.DefaultTarget.Function.Name,
Action: pulumi.String("lambda:InvokeFunction"),
Principal: pulumi.String("apigateway.amazonaws.com"),
SourceArn: pulumi.Sprintf("%s/*/*", res.Api.ExecutionArn),
}, opts...)
if err != nil {
return nil, err
}

// check if the function name is different if not assign to default
integrationConnect := integrationDefault
if args.ConnectTarget != args.DefaultTarget {
integrationConnect, err = apigatewayv2.NewIntegration(ctx, fmt.Sprintf("%s-connect-integration", name), &apigatewayv2.IntegrationArgs{
ApiId: res.Api.ID(),
IntegrationType: pulumi.String("AWS_PROXY"),
IntegrationUri: args.ConnectTarget.Function.Arn,
})
if err != nil {
return nil, err
}

_, err = awslambda.NewPermission(ctx, fmt.Sprintf("%s-connect-permission", name), &awslambda.PermissionArgs{
Function: args.DefaultTarget.Function.Name,
Action: pulumi.String("lambda:InvokeFunction"),
Principal: pulumi.String("apigateway.amazonaws.com"),
SourceArn: pulumi.Sprintf("%s/*/*", res.Api.ExecutionArn),
}, opts...)
if err != nil {
return nil, err
}
}

// check if the function name is different if not assign to default
integrationDisconnect := integrationDefault
if args.DisconnectTarget != args.DefaultTarget {
integrationDisconnect, err = apigatewayv2.NewIntegration(ctx, fmt.Sprintf("%s-disconnect-integration", name), &apigatewayv2.IntegrationArgs{
ApiId: res.Api.ID(),
IntegrationType: pulumi.String("AWS_PROXY"),
IntegrationUri: args.DisconnectTarget.Function.Arn,
})
if err != nil {
return nil, err
}

_, err = awslambda.NewPermission(ctx, fmt.Sprintf("%s-disconnect-permission", name), &awslambda.PermissionArgs{
Function: args.DefaultTarget.Function.Name,
Action: pulumi.String("lambda:InvokeFunction"),
Principal: pulumi.String("apigateway.amazonaws.com"),
SourceArn: pulumi.Sprintf("%s/*/*", res.Api.ExecutionArn),
}, opts...)
if err != nil {
return nil, err
}
}

// Create the routes for the websocket handler
// The default message route
_, err = apigatewayv2.NewRoute(ctx, fmt.Sprintf("%s-default-route", name), &apigatewayv2.RouteArgs{
ApiId: res.Api.ID(),
RouteKey: pulumi.String("$default"),
Target: pulumi.Sprintf("integrations/%s", integrationDefault.ID()),
})
if err != nil {
return nil, err
}

// The client connection route
_, err = apigatewayv2.NewRoute(ctx, fmt.Sprintf("%s-connect-route", name), &apigatewayv2.RouteArgs{
ApiId: res.Api.ID(),
RouteKey: pulumi.String("$connect"),
Target: pulumi.Sprintf("integrations/%s", integrationConnect.ID()),
})
if err != nil {
return nil, err
}

// the client disconnection route
_, err = apigatewayv2.NewRoute(ctx, fmt.Sprintf("%s-disconnect-route", name), &apigatewayv2.RouteArgs{
ApiId: res.Api.ID(),
RouteKey: pulumi.String("$disconnect"),
Target: pulumi.Sprintf("integrations/%s", integrationDisconnect.ID()),
})
if err != nil {
return nil, err
}

_, err = apigatewayv2.NewStage(ctx, name+"DefaultStage", &apigatewayv2.StageArgs{
AutoDeploy: pulumi.BoolPtr(true),
Name: pulumi.String("$default"),
ApiId: res.Api.ID(),
Tags: common.Tags(ctx, args.StackID, name+"DefaultStage"),
}, opts...)
if err != nil {
return nil, err
}

if err != nil {
return nil, err
}

endPoint := res.Api.ApiEndpoint.ApplyT(func(ep string) string {
return ep
}).(pulumi.StringInput)

ctx.Export("api:"+name, endPoint)

return res, nil
}
7 changes: 4 additions & 3 deletions cloud/aws/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/avast/retry-go v3.0.0+incompatible
github.com/aws/aws-lambda-go v1.34.1
github.com/aws/aws-sdk-go v1.44.146
github.com/aws/aws-sdk-go-v2 v1.17.2
github.com/aws/aws-sdk-go-v2 v1.18.0
github.com/aws/aws-sdk-go-v2/config v1.18.4
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.6
github.com/aws/aws-sdk-go-v2/service/apigatewayv2 v1.12.20
Expand Down Expand Up @@ -70,10 +70,11 @@ require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.9 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.16 // indirect
github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi v1.11.10 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.26 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.20 // indirect
Expand Down
9 changes: 9 additions & 0 deletions cloud/aws/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ github.com/aws/aws-sdk-go v1.44.146/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8
github.com/aws/aws-sdk-go-v2 v1.17.1/go.mod h1:JLnGeGONAyi2lWXI1p0PCIOIy333JMVK1U7Hf0aRFLw=
github.com/aws/aws-sdk-go-v2 v1.17.2 h1:r0yRZInwiPBNpQ4aDy/Ssh3ROWsGtKDwar2JS8Lm+N8=
github.com/aws/aws-sdk-go-v2 v1.17.2/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.9 h1:RKci2D7tMwpvGpDNZnGQw9wk6v7o/xSwFcUAuNPoB8k=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.9/go.mod h1:vCmV1q1VK8eoQJ5+aYE7PkK1K6v41qJ5pJdK3ggCDvg=
github.com/aws/aws-sdk-go-v2/config v1.18.4 h1:VZKhr3uAADXHStS/Gf9xSYVmmaluTUfkc0dcbPiDsKE=
Expand All @@ -130,13 +132,19 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20/go.mod h1:d9xFpWd3qYwdIXM
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.25/go.mod h1:Zb29PYkf42vVYQY6pvSyJCJcFHlPIiY+YKdPtwnvMkY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26 h1:5WU31cY7m0tG+AiaXuXGoMzo2GBQ1IixtWa8Yywsgco=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26/go.mod h1:2E0LdbJW6lbeU4uxjum99GZzI0ZjDpAb0CoSCM0oeEY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.19/go.mod h1:6Q0546uHDp421okhmmGfbxzq2hBqbXFNpi4k+Q1JnQA=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20 h1:WW0qSzDWoiWU2FS5DbKpxGilFVlCEJPwx4YtjdfI0Jw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20/go.mod h1:/+6lSiby8TBFpTVXZgKiN/rCfkYXEGvhlM4zCgPpt7w=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27 h1:N2eKFw2S+JWRCtTt0IhIX7uoGGQciD4p6ba+SJv4WEU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27/go.mod h1:RdwFVc7PBYWY33fa2+8T1mSqQ7ZEK4ILpM0wfioDC3w=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.16 h1:2EXB7dtGwRYIN3XQ9qwIW504DVbKIw3r89xQnonGdsQ=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.16/go.mod h1:XH+3h395e3WVdd6T2Z3mPxuI+x/HVtdqVOREkTiyubs=
github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi v1.11.10 h1:os9Aix72xeiZ9+wQ2LZJSoHOzGUqKYLLS9S7Y4BaRmI=
github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi v1.11.10/go.mod h1:rFWa3WA43LkZ9pAJkGuO90kU+N0Ru2dCJwjRfZ8kKZ8=
github.com/aws/aws-sdk-go-v2/service/apigatewayv2 v1.12.20 h1:7N4o3yLag3c3c22POkmCAfrr/OQG5807a9NRh9lUUKw=
github.com/aws/aws-sdk-go-v2/service/apigatewayv2 v1.12.20/go.mod h1:BEIWaGqO27qq9JeFeY746S4+SFmBajpV+yhGne2qbMo=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.7/go.mod h1:BiglbKCG56L8tmMnUEyEQo422BO9xnNR8vVHnOsByf8=
Expand Down Expand Up @@ -1076,6 +1084,7 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw=
golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
16 changes: 16 additions & 0 deletions cloud/aws/mocks/provider/aws.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cloud/aws/runtime/cmd/membrane.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
sqs_service "github.com/nitrictech/nitric/cloud/aws/runtime/queue"
secrets_manager_secret_service "github.com/nitrictech/nitric/cloud/aws/runtime/secret"
s3_service "github.com/nitrictech/nitric/cloud/aws/runtime/storage"
"github.com/nitrictech/nitric/cloud/aws/runtime/websocket"
base_http "github.com/nitrictech/nitric/cloud/common/runtime/gateway"
"github.com/nitrictech/nitric/core/pkg/membrane"
"github.com/nitrictech/nitric/core/pkg/utils"
Expand Down Expand Up @@ -62,6 +63,7 @@ func main() {
membraneOpts.StoragePlugin, _ = s3_service.New(provider)
membraneOpts.ResourcesPlugin = provider
membraneOpts.CreateTracerProvider = newTracerProvider
membraneOpts.WebsocketPlugin, _ = websocket.NewAwsApiGatewayWebsocket(provider)

m, err := membrane.New(membraneOpts)
if err != nil {
Expand Down
Loading