Skip to content

Commit

Permalink
AWS websocket implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tjholm committed Jul 6, 2023
1 parent 0dd65ae commit 42d2750
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 11 deletions.
9 changes: 9 additions & 0 deletions cloud/aws/deploy/policy/iam.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/nitrictech/nitric/cloud/aws/deploy/queue"
"github.com/nitrictech/nitric/cloud/aws/deploy/secret"
"github.com/nitrictech/nitric/cloud/aws/deploy/topic"
"github.com/nitrictech/nitric/cloud/aws/deploy/websocket"
deploy "github.com/nitrictech/nitric/core/pkg/api/nitric/deploy/v1"
v1 "github.com/nitrictech/nitric/core/pkg/api/nitric/v1"
)
Expand All @@ -54,6 +55,7 @@ type StackResources struct {
Buckets map[string]*bucket.S3Bucket
Collections map[string]*collection.DynamodbCollection
Secrets map[string]*secret.SecretsManagerSecret
Websockets map[string]*websocket.AwsWebsocketApiGateway
}

type PrincipalMap = map[v1.ResourceType]map[string]*iam.Role
Expand Down Expand Up @@ -132,6 +134,9 @@ var awsActionsMap map[v1.Action][]string = map[v1.Action][]string{
v1.Action_SecretPut: {
"secretsmanager:PutSecretValue",
},
v1.Action_WebsocketManage: {
"execute-api:ManageConnections",
},
}

func actionsToAwsActions(actions []v1.Action) []string {
Expand Down Expand Up @@ -167,6 +172,10 @@ func arnForResource(resource *deploy.Resource, resources *StackResources) ([]int
if s, ok := resources.Secrets[resource.Name]; ok {
return []interface{}{s.SecretsManager.Arn}, nil
}
case v1.ResourceType_Websocket:
if w, ok := resources.Websockets[resource.Name]; ok {
return []interface{}{pulumi.Sprintf("%s/*", w.Api.ExecutionArn)}, nil
}
default:
return nil, fmt.Errorf(
"invalid resource type: %s. Did you mean to define it as a principal?", resource.Type)
Expand Down
19 changes: 19 additions & 0 deletions cloud/aws/deploy/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/nitrictech/nitric/cloud/aws/deploy/secret"
"github.com/nitrictech/nitric/cloud/aws/deploy/stack"
"github.com/nitrictech/nitric/cloud/aws/deploy/topic"
"github.com/nitrictech/nitric/cloud/aws/deploy/websocket"
commonDeploy "github.com/nitrictech/nitric/cloud/common/deploy"
"github.com/nitrictech/nitric/cloud/common/deploy/image"
pulumiutils "github.com/nitrictech/nitric/cloud/common/deploy/pulumi"
Expand Down Expand Up @@ -309,6 +310,23 @@ func (d *DeployServer) Up(request *deploy.DeployUpRequest, stream deploy.DeployS
}
}

// deploy websockets
websockets := map[string]*websocket.AwsWebsocketApiGateway{}
for _, res := range request.Spec.Resources {
switch ws := res.Config.(type) {
case *deploy.Resource_Websocket:
websockets[res.Name], err = websocket.NewAwsWebsocketApiGateway(ctx, res.Name, &websocket.AwsWebsocketApiGatewayArgs{
DefaultTarget: execs[ws.Websocket.MessageTarget.GetExecutionUnit()],
ConnectTarget: execs[ws.Websocket.ConnectTarget.GetExecutionUnit()],
DisconnectTarget: execs[ws.Websocket.DisconnectTarget.GetExecutionUnit()],
StackID: stackID,
})
if err != nil {
return err
}
}
}

// Deploy all schedules
schedules := map[string]*schedule.AwsEventbridgeSchedule{}
for _, res := range request.Spec.Resources {
Expand Down Expand Up @@ -381,6 +399,7 @@ func (d *DeployServer) Up(request *deploy.DeployUpRequest, stream deploy.DeployS
Queues: queues,
Collections: collections,
Secrets: secrets,
Websockets: websockets,
},
Principals: principals,
})
Expand Down
184 changes: 184 additions & 0 deletions cloud/aws/deploy/websocket/apigateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright Nitric Pty Ltd.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package websocket

import (
"fmt"

"github.com/pulumi/pulumi-aws/sdk/v5/go/aws/apigatewayv2"
awslambda "github.com/pulumi/pulumi-aws/sdk/v5/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"

"github.com/nitrictech/nitric/cloud/aws/deploy/exec"
common "github.com/nitrictech/nitric/cloud/common/deploy/tags"
)

type AwsWebsocketApiGatewayArgs struct {
DefaultTarget *exec.LambdaExecUnit
ConnectTarget *exec.LambdaExecUnit
DisconnectTarget *exec.LambdaExecUnit

StackID pulumi.StringInput
}

type AwsWebsocketApiGateway struct {
pulumi.ResourceState

Name string
Api *apigatewayv2.Api
}

func NewAwsWebsocketApiGateway(ctx *pulumi.Context, name string, args *AwsWebsocketApiGatewayArgs, opts ...pulumi.ResourceOption) (*AwsWebsocketApiGateway, error) {
res := &AwsWebsocketApiGateway{Name: name}

err := ctx.RegisterComponentResource("nitric:websocket:AwsApiGateway", name, res, opts...)
if err != nil {
return nil, err
}

opts = append(opts, pulumi.Parent(res))

res.Api, err = apigatewayv2.NewApi(ctx, name, &apigatewayv2.ApiArgs{
ProtocolType: pulumi.String("WEBSOCKET"),
Tags: common.Tags(ctx, args.StackID, name),
// TODO: We won't actually be using this, but it is required.
// Instead we'll be using the $default route
RouteSelectionExpression: pulumi.String("$request.body.action"),
}, opts...)
if err != nil {
return nil, err
}

// Create the API integrations
integrationDefault, err := apigatewayv2.NewIntegration(ctx, fmt.Sprintf("%s-default-integration", name), &apigatewayv2.IntegrationArgs{
ApiId: res.Api.ID(),
IntegrationType: pulumi.String("AWS_PROXY"),
IntegrationUri: args.DefaultTarget.Function.Arn,
})
if err != nil {
return nil, err
}

_, err = awslambda.NewPermission(ctx, fmt.Sprintf("%s-default-permission", name), &awslambda.PermissionArgs{
Function: args.DefaultTarget.Function.Name,
Action: pulumi.String("lambda:InvokeFunction"),
Principal: pulumi.String("apigateway.amazonaws.com"),
SourceArn: pulumi.Sprintf("%s/*/*", res.Api.ExecutionArn),
}, opts...)
if err != nil {
return nil, err
}

// check if the function name is different if not assign to default
integrationConnect := integrationDefault
if args.ConnectTarget != args.DefaultTarget {
integrationConnect, err = apigatewayv2.NewIntegration(ctx, fmt.Sprintf("%s-connect-integration", name), &apigatewayv2.IntegrationArgs{
ApiId: res.Api.ID(),
IntegrationType: pulumi.String("AWS_PROXY"),
IntegrationUri: args.ConnectTarget.Function.Arn,
})
if err != nil {
return nil, err
}

_, err = awslambda.NewPermission(ctx, fmt.Sprintf("%s-connect-permission", name), &awslambda.PermissionArgs{
Function: args.DefaultTarget.Function.Name,
Action: pulumi.String("lambda:InvokeFunction"),
Principal: pulumi.String("apigateway.amazonaws.com"),
SourceArn: pulumi.Sprintf("%s/*/*", res.Api.ExecutionArn),
}, opts...)
if err != nil {
return nil, err
}
}

// check if the function name is different if not assign to default
integrationDisconnect := integrationDefault
if args.DisconnectTarget != args.DefaultTarget {
integrationDisconnect, err = apigatewayv2.NewIntegration(ctx, fmt.Sprintf("%s-disconnect-integration", name), &apigatewayv2.IntegrationArgs{
ApiId: res.Api.ID(),
IntegrationType: pulumi.String("AWS_PROXY"),
IntegrationUri: args.DisconnectTarget.Function.Arn,
})
if err != nil {
return nil, err
}

_, err = awslambda.NewPermission(ctx, fmt.Sprintf("%s-disconnect-permission", name), &awslambda.PermissionArgs{
Function: args.DefaultTarget.Function.Name,
Action: pulumi.String("lambda:InvokeFunction"),
Principal: pulumi.String("apigateway.amazonaws.com"),
SourceArn: pulumi.Sprintf("%s/*/*", res.Api.ExecutionArn),
}, opts...)
if err != nil {
return nil, err
}
}

// Create the routes for the websocket handler
// The default message route
_, err = apigatewayv2.NewRoute(ctx, fmt.Sprintf("%s-default-route", name), &apigatewayv2.RouteArgs{
ApiId: res.Api.ID(),
RouteKey: pulumi.String("$default"),
Target: pulumi.Sprintf("integrations/%s", integrationDefault.ID()),
})
if err != nil {
return nil, err
}

// The client connection route
_, err = apigatewayv2.NewRoute(ctx, fmt.Sprintf("%s-connect-route", name), &apigatewayv2.RouteArgs{
ApiId: res.Api.ID(),
RouteKey: pulumi.String("$connect"),
Target: pulumi.Sprintf("integrations/%s", integrationConnect.ID()),
})
if err != nil {
return nil, err
}

// the client disconnection route
_, err = apigatewayv2.NewRoute(ctx, fmt.Sprintf("%s-disconnect-route", name), &apigatewayv2.RouteArgs{
ApiId: res.Api.ID(),
RouteKey: pulumi.String("$disconnect"),
Target: pulumi.Sprintf("integrations/%s", integrationDisconnect.ID()),
})
if err != nil {
return nil, err
}

_, err = apigatewayv2.NewStage(ctx, name+"DefaultStage", &apigatewayv2.StageArgs{
AutoDeploy: pulumi.BoolPtr(true),
Name: pulumi.String("$default"),
ApiId: res.Api.ID(),
Tags: common.Tags(ctx, args.StackID, name+"DefaultStage"),
}, opts...)
if err != nil {
return nil, err
}

if err != nil {
return nil, err
}

endPoint := res.Api.ApiEndpoint.ApplyT(func(ep string) string {
return ep
}).(pulumi.StringInput)

ctx.Export("api:"+name, endPoint)

return res, nil
}
7 changes: 4 additions & 3 deletions cloud/aws/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/avast/retry-go v3.0.0+incompatible
github.com/aws/aws-lambda-go v1.34.1
github.com/aws/aws-sdk-go v1.44.146
github.com/aws/aws-sdk-go-v2 v1.17.2
github.com/aws/aws-sdk-go-v2 v1.18.0
github.com/aws/aws-sdk-go-v2/config v1.18.4
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.10.6
github.com/aws/aws-sdk-go-v2/service/apigatewayv2 v1.12.20
Expand Down Expand Up @@ -70,10 +70,11 @@ require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.9 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.16 // indirect
github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi v1.11.10 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.13.26 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.20 // indirect
Expand Down
9 changes: 9 additions & 0 deletions cloud/aws/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ github.com/aws/aws-sdk-go v1.44.146/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8
github.com/aws/aws-sdk-go-v2 v1.17.1/go.mod h1:JLnGeGONAyi2lWXI1p0PCIOIy333JMVK1U7Hf0aRFLw=
github.com/aws/aws-sdk-go-v2 v1.17.2 h1:r0yRZInwiPBNpQ4aDy/Ssh3ROWsGtKDwar2JS8Lm+N8=
github.com/aws/aws-sdk-go-v2 v1.17.2/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.9 h1:RKci2D7tMwpvGpDNZnGQw9wk6v7o/xSwFcUAuNPoB8k=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.9/go.mod h1:vCmV1q1VK8eoQJ5+aYE7PkK1K6v41qJ5pJdK3ggCDvg=
github.com/aws/aws-sdk-go-v2/config v1.18.4 h1:VZKhr3uAADXHStS/Gf9xSYVmmaluTUfkc0dcbPiDsKE=
Expand All @@ -130,13 +132,19 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20/go.mod h1:d9xFpWd3qYwdIXM
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.25/go.mod h1:Zb29PYkf42vVYQY6pvSyJCJcFHlPIiY+YKdPtwnvMkY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26 h1:5WU31cY7m0tG+AiaXuXGoMzo2GBQ1IixtWa8Yywsgco=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26/go.mod h1:2E0LdbJW6lbeU4uxjum99GZzI0ZjDpAb0CoSCM0oeEY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.19/go.mod h1:6Q0546uHDp421okhmmGfbxzq2hBqbXFNpi4k+Q1JnQA=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20 h1:WW0qSzDWoiWU2FS5DbKpxGilFVlCEJPwx4YtjdfI0Jw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20/go.mod h1:/+6lSiby8TBFpTVXZgKiN/rCfkYXEGvhlM4zCgPpt7w=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27 h1:N2eKFw2S+JWRCtTt0IhIX7uoGGQciD4p6ba+SJv4WEU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27/go.mod h1:RdwFVc7PBYWY33fa2+8T1mSqQ7ZEK4ILpM0wfioDC3w=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.16 h1:2EXB7dtGwRYIN3XQ9qwIW504DVbKIw3r89xQnonGdsQ=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.16/go.mod h1:XH+3h395e3WVdd6T2Z3mPxuI+x/HVtdqVOREkTiyubs=
github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi v1.11.10 h1:os9Aix72xeiZ9+wQ2LZJSoHOzGUqKYLLS9S7Y4BaRmI=
github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi v1.11.10/go.mod h1:rFWa3WA43LkZ9pAJkGuO90kU+N0Ru2dCJwjRfZ8kKZ8=
github.com/aws/aws-sdk-go-v2/service/apigatewayv2 v1.12.20 h1:7N4o3yLag3c3c22POkmCAfrr/OQG5807a9NRh9lUUKw=
github.com/aws/aws-sdk-go-v2/service/apigatewayv2 v1.12.20/go.mod h1:BEIWaGqO27qq9JeFeY746S4+SFmBajpV+yhGne2qbMo=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.7/go.mod h1:BiglbKCG56L8tmMnUEyEQo422BO9xnNR8vVHnOsByf8=
Expand Down Expand Up @@ -1076,6 +1084,7 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw=
golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
16 changes: 16 additions & 0 deletions cloud/aws/mocks/provider/aws.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cloud/aws/runtime/cmd/membrane.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
sqs_service "github.com/nitrictech/nitric/cloud/aws/runtime/queue"
secrets_manager_secret_service "github.com/nitrictech/nitric/cloud/aws/runtime/secret"
s3_service "github.com/nitrictech/nitric/cloud/aws/runtime/storage"
"github.com/nitrictech/nitric/cloud/aws/runtime/websocket"
base_http "github.com/nitrictech/nitric/cloud/common/runtime/gateway"
"github.com/nitrictech/nitric/core/pkg/membrane"
"github.com/nitrictech/nitric/core/pkg/utils"
Expand Down Expand Up @@ -62,6 +63,7 @@ func main() {
membraneOpts.StoragePlugin, _ = s3_service.New(provider)
membraneOpts.ResourcesPlugin = provider
membraneOpts.CreateTracerProvider = newTracerProvider
membraneOpts.WebsocketPlugin, _ = websocket.NewAwsApiGatewayWebsocket(provider)

m, err := membrane.New(membraneOpts)
if err != nil {
Expand Down
Loading

0 comments on commit 42d2750

Please sign in to comment.