From 8129a5ef5d84f7bd226c59825a747a72c3e3cf04 Mon Sep 17 00:00:00 2001 From: Heming Han Date: Mon, 12 Jun 2023 10:42:17 -0700 Subject: [PATCH] Move eventstream to /ecs-agent and remove /agent/wsclient (#3746) * Move eventstream to /ecs-agent * remove wsclient from /agent (both acs and tcs client has been using /ecs-agent/wsclient --- agent/acs/handler/acs_handler.go | 2 +- agent/acs/handler/acs_handler_test.go | 2 +- agent/app/agent.go | 2 +- agent/app/agent_compatibility_linux_test.go | 2 +- agent/app/agent_test.go | 2 +- agent/app/agent_unix_test.go | 2 +- agent/app/agent_windows_test.go | 2 +- agent/app/data.go | 2 +- agent/app/data_test.go | 2 +- agent/engine/common_integ_test.go | 2 +- agent/engine/default.go | 2 +- agent/engine/docker_task_engine.go | 2 +- agent/engine/docker_task_engine_test.go | 2 +- agent/engine/engine_sudo_linux_integ_test.go | 2 +- agent/engine/engine_windows_integ_test.go | 2 +- agent/engine/task_manager.go | 2 +- agent/engine/task_manager_data_test.go | 2 +- agent/engine/task_manager_test.go | 2 +- agent/stats/common_test.go | 2 +- agent/stats/engine.go | 2 +- agent/tcs/handler/handler.go | 2 +- agent/tcs/handler/handler_test.go | 2 +- agent/tcs/handler/types.go | 2 +- .../ecs-agent}/eventstream/eventstream.go | 2 - agent/vendor/modules.txt | 1 + agent/wsclient/client.go | 489 ------------------ agent/wsclient/client_test.go | 282 ---------- agent/wsclient/decode.go | 101 ---- agent/wsclient/error.go | 73 --- agent/wsclient/errors.go | 61 --- agent/wsclient/generate_mocks.go | 16 - agent/wsclient/mock/client.go | 217 -------- agent/wsclient/mock/utils/utils.go | 67 --- agent/wsclient/types.go | 59 --- agent/wsclient/wsconn/conn.go | 27 - agent/wsclient/wsconn/generate_mocks.go | 16 - agent/wsclient/wsconn/mock/conn.go | 135 ----- ecs-agent/eventstream/eventstream.go | 130 +++++ .../eventstream/eventstream_test.go | 0 39 files changed, 154 insertions(+), 1568 deletions(-) rename agent/{ => vendor/github.com/aws/amazon-ecs-agent/ecs-agent}/eventstream/eventstream.go (96%) delete mode 100644 agent/wsclient/client.go delete mode 100644 agent/wsclient/client_test.go delete mode 100644 agent/wsclient/decode.go delete mode 100644 agent/wsclient/error.go delete mode 100644 agent/wsclient/errors.go delete mode 100644 agent/wsclient/generate_mocks.go delete mode 100644 agent/wsclient/mock/client.go delete mode 100644 agent/wsclient/mock/utils/utils.go delete mode 100644 agent/wsclient/types.go delete mode 100644 agent/wsclient/wsconn/conn.go delete mode 100644 agent/wsclient/wsconn/generate_mocks.go delete mode 100644 agent/wsclient/wsconn/mock/conn.go create mode 100644 ecs-agent/eventstream/eventstream.go rename {agent => ecs-agent}/eventstream/eventstream_test.go (100%) diff --git a/agent/acs/handler/acs_handler.go b/agent/acs/handler/acs_handler.go index ab6df1cd74b..9739005472c 100644 --- a/agent/acs/handler/acs_handler.go +++ b/agent/acs/handler/acs_handler.go @@ -32,11 +32,11 @@ import ( "github.com/aws/amazon-ecs-agent/agent/engine" "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" "github.com/aws/amazon-ecs-agent/agent/eventhandler" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/version" acssession "github.com/aws/amazon-ecs-agent/ecs-agent/acs/session" rolecredentials "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry" "github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime" "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient" diff --git a/agent/acs/handler/acs_handler_test.go b/agent/acs/handler/acs_handler_test.go index 8f1d31f4294..c5bcc5c147c 100644 --- a/agent/acs/handler/acs_handler_test.go +++ b/agent/acs/handler/acs_handler_test.go @@ -40,12 +40,12 @@ import ( "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks" "github.com/aws/amazon-ecs-agent/agent/eventhandler" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/version" acsclient "github.com/aws/amazon-ecs-agent/ecs-agent/acs/client" rolecredentials "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" mock_credentials "github.com/aws/amazon-ecs-agent/ecs-agent/credentials/mocks" "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry" mock_retry "github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry/mock" mock_wsclient "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient/mock" diff --git a/agent/app/agent.go b/agent/app/agent.go index f748c1384a7..7b87c272ed3 100644 --- a/agent/app/agent.go +++ b/agent/app/agent.go @@ -50,7 +50,6 @@ import ( engineserviceconnect "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect" "github.com/aws/amazon-ecs-agent/agent/eni/pause" "github.com/aws/amazon-ecs-agent/agent/eventhandler" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/handlers" "github.com/aws/amazon-ecs-agent/agent/sighandlers" "github.com/aws/amazon-ecs-agent/agent/sighandlers/exitcodes" @@ -65,6 +64,7 @@ import ( acsclient "github.com/aws/amazon-ecs-agent/ecs-agent/acs/client" apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs" "github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry" "github.com/aws/aws-sdk-go/aws" diff --git a/agent/app/agent_compatibility_linux_test.go b/agent/app/agent_compatibility_linux_test.go index 2f3c1a92ac0..b7833d0a16c 100644 --- a/agent/app/agent_compatibility_linux_test.go +++ b/agent/app/agent_compatibility_linux_test.go @@ -25,8 +25,8 @@ import ( "github.com/aws/amazon-ecs-agent/agent/data" "github.com/aws/amazon-ecs-agent/agent/ec2" "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" - "github.com/aws/amazon-ecs-agent/agent/eventstream" mock_statemanager "github.com/aws/amazon-ecs-agent/agent/statemanager/mocks" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" diff --git a/agent/app/agent_test.go b/agent/app/agent_test.go index a1ea0ecc7b7..23ee5aa869c 100644 --- a/agent/app/agent_test.go +++ b/agent/app/agent_test.go @@ -42,7 +42,6 @@ import ( mock_execcmdagent "github.com/aws/amazon-ecs-agent/agent/engine/execcmd/mocks" mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks" mock_serviceconnect "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect/mock" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/sighandlers/exitcodes" "github.com/aws/amazon-ecs-agent/agent/statemanager" mock_statemanager "github.com/aws/amazon-ecs-agent/agent/statemanager/mocks" @@ -51,6 +50,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/version" apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors" mock_credentials "github.com/aws/amazon-ecs-agent/ecs-agent/credentials/mocks" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" aws_credentials "github.com/aws/aws-sdk-go/aws/credentials" diff --git a/agent/app/agent_unix_test.go b/agent/app/agent_unix_test.go index 32149e574cf..ccd1a622614 100644 --- a/agent/app/agent_unix_test.go +++ b/agent/app/agent_unix_test.go @@ -38,13 +38,13 @@ import ( mock_serviceconnect "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect/mock" mock_udev "github.com/aws/amazon-ecs-agent/agent/eni/udevwrapper/mocks" "github.com/aws/amazon-ecs-agent/agent/eni/watcher" - "github.com/aws/amazon-ecs-agent/agent/eventstream" mock_gpu "github.com/aws/amazon-ecs-agent/agent/gpu/mocks" "github.com/aws/amazon-ecs-agent/agent/sighandlers/exitcodes" "github.com/aws/amazon-ecs-agent/agent/taskresource" "github.com/aws/amazon-ecs-agent/agent/taskresource/cgroup/control/mock_control" mock_loader "github.com/aws/amazon-ecs-agent/agent/utils/loader/mocks" mock_mobypkgwrapper "github.com/aws/amazon-ecs-agent/agent/utils/mobypkgwrapper/mocks" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" diff --git a/agent/app/agent_windows_test.go b/agent/app/agent_windows_test.go index c1657ef3c12..b55f9fe1137 100644 --- a/agent/app/agent_windows_test.go +++ b/agent/app/agent_windows_test.go @@ -27,10 +27,10 @@ import ( "github.com/aws/amazon-ecs-agent/agent/ec2" mock_dockerstate "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate/mocks" mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/sighandlers" "github.com/aws/amazon-ecs-agent/agent/sighandlers/exitcodes" statemanager_mocks "github.com/aws/amazon-ecs-agent/agent/statemanager/mocks" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" diff --git a/agent/app/data.go b/agent/app/data.go index fb5cebeed04..77dcafe94ab 100644 --- a/agent/app/data.go +++ b/agent/app/data.go @@ -22,8 +22,8 @@ import ( "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" "github.com/aws/amazon-ecs-agent/agent/engine/execcmd" "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/pkg/errors" ) diff --git a/agent/app/data_test.go b/agent/app/data_test.go index 1ec1a0b7969..cad597134fa 100644 --- a/agent/app/data_test.go +++ b/agent/app/data_test.go @@ -28,10 +28,10 @@ import ( "github.com/aws/amazon-ecs-agent/agent/data" "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" "github.com/aws/amazon-ecs-agent/agent/engine/image" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/statemanager" "github.com/aws/amazon-ecs-agent/ecs-agent/api/attachmentinfo" "github.com/aws/amazon-ecs-agent/ecs-agent/api/eni" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" diff --git a/agent/engine/common_integ_test.go b/agent/engine/common_integ_test.go index b62d38849a2..eb15a1ab2f5 100644 --- a/agent/engine/common_integ_test.go +++ b/agent/engine/common_integ_test.go @@ -37,12 +37,12 @@ import ( "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" "github.com/aws/amazon-ecs-agent/agent/engine/execcmd" engineserviceconnect "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect" - "github.com/aws/amazon-ecs-agent/agent/eventstream" s3factory "github.com/aws/amazon-ecs-agent/agent/s3/factory" ssmfactory "github.com/aws/amazon-ecs-agent/agent/ssm/factory" "github.com/aws/amazon-ecs-agent/agent/statechange" "github.com/aws/amazon-ecs-agent/agent/taskresource" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" log "github.com/cihub/seelog" "github.com/stretchr/testify/assert" ) diff --git a/agent/engine/default.go b/agent/engine/default.go index 2a8ca358269..d83c53a89cc 100644 --- a/agent/engine/default.go +++ b/agent/engine/default.go @@ -22,9 +22,9 @@ import ( "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" "github.com/aws/amazon-ecs-agent/agent/engine/execcmd" "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/taskresource" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" ) // NewTaskEngine returns a default TaskEngine diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index a5fd5d539fa..0ed71d7d80c 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -40,7 +40,6 @@ import ( "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" "github.com/aws/amazon-ecs-agent/agent/engine/execcmd" "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/metrics" "github.com/aws/amazon-ecs-agent/agent/statechange" "github.com/aws/amazon-ecs-agent/agent/taskresource" @@ -51,6 +50,7 @@ import ( "github.com/aws/amazon-ecs-agent/ecs-agent/api/appnet" apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/aws/amazon-ecs-agent/ecs-agent/logger" "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" "github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry" diff --git a/agent/engine/docker_task_engine_test.go b/agent/engine/docker_task_engine_test.go index b7aeb3d1158..7b9cdf60020 100644 --- a/agent/engine/docker_task_engine_test.go +++ b/agent/engine/docker_task_engine_test.go @@ -55,7 +55,6 @@ import ( mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks" mock_engineserviceconnect "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect/mock" "github.com/aws/amazon-ecs-agent/agent/engine/testdata" - "github.com/aws/amazon-ecs-agent/agent/eventstream" mock_ssm_factory "github.com/aws/amazon-ecs-agent/agent/ssm/factory/mocks" mock_ssmiface "github.com/aws/amazon-ecs-agent/agent/ssm/mocks" "github.com/aws/amazon-ecs-agent/agent/taskresource" @@ -68,6 +67,7 @@ import ( apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" mock_credentials "github.com/aws/amazon-ecs-agent/ecs-agent/credentials/mocks" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" mock_ttime "github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime/mocks" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/secretsmanager" diff --git a/agent/engine/engine_sudo_linux_integ_test.go b/agent/engine/engine_sudo_linux_integ_test.go index 99edd332b1a..3b84e14b30d 100644 --- a/agent/engine/engine_sudo_linux_integ_test.go +++ b/agent/engine/engine_sudo_linux_integ_test.go @@ -62,7 +62,6 @@ import ( "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" "github.com/aws/amazon-ecs-agent/agent/engine/execcmd" engineserviceconnect "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/taskresource" cgroup "github.com/aws/amazon-ecs-agent/agent/taskresource/cgroup/control" "github.com/aws/amazon-ecs-agent/agent/taskresource/firelens" @@ -70,6 +69,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/utils" "github.com/aws/amazon-ecs-agent/agent/utils/ioutilwrapper" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" ) var ( diff --git a/agent/engine/engine_windows_integ_test.go b/agent/engine/engine_windows_integ_test.go index 7418c69d418..d63d7b0cf61 100644 --- a/agent/engine/engine_windows_integ_test.go +++ b/agent/engine/engine_windows_integ_test.go @@ -44,13 +44,13 @@ import ( "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" "github.com/aws/amazon-ecs-agent/agent/engine/execcmd" engineserviceconnect "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect" - "github.com/aws/amazon-ecs-agent/agent/eventstream" s3factory "github.com/aws/amazon-ecs-agent/agent/s3/factory" ssmfactory "github.com/aws/amazon-ecs-agent/agent/ssm/factory" "github.com/aws/amazon-ecs-agent/agent/taskresource" taskresourcevolume "github.com/aws/amazon-ecs-agent/agent/taskresource/volume" "github.com/aws/amazon-ecs-agent/agent/utils" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/aws/aws-sdk-go/aws" "github.com/cihub/seelog" diff --git a/agent/engine/task_manager.go b/agent/engine/task_manager.go index dde9bcbe29f..61654592a70 100644 --- a/agent/engine/task_manager.go +++ b/agent/engine/task_manager.go @@ -33,13 +33,13 @@ import ( "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi" "github.com/aws/amazon-ecs-agent/agent/ecscni" "github.com/aws/amazon-ecs-agent/agent/engine/dependencygraph" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/statechange" "github.com/aws/amazon-ecs-agent/agent/taskresource" resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status" utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync" apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/aws/amazon-ecs-agent/ecs-agent/logger" "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" "github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry" diff --git a/agent/engine/task_manager_data_test.go b/agent/engine/task_manager_data_test.go index e4eb48df071..ada649d37b4 100644 --- a/agent/engine/task_manager_data_test.go +++ b/agent/engine/task_manager_data_test.go @@ -25,13 +25,13 @@ import ( apitask "github.com/aws/amazon-ecs-agent/agent/api/task" apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status" "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/statechange" "github.com/aws/amazon-ecs-agent/agent/taskresource" resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status" resourcetype "github.com/aws/amazon-ecs-agent/agent/taskresource/types" "github.com/aws/amazon-ecs-agent/agent/taskresource/volume" utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/agent/engine/task_manager_test.go b/agent/engine/task_manager_test.go index aab5f64e76a..247bdba0419 100644 --- a/agent/engine/task_manager_test.go +++ b/agent/engine/task_manager_test.go @@ -44,7 +44,6 @@ import ( mock_dockerstate "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate/mocks" mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks" "github.com/aws/amazon-ecs-agent/agent/engine/testdata" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/sighandlers/exitcodes" "github.com/aws/amazon-ecs-agent/agent/statechange" "github.com/aws/amazon-ecs-agent/agent/taskresource/volume" @@ -52,6 +51,7 @@ import ( apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" mock_credentials "github.com/aws/amazon-ecs-agent/ecs-agent/credentials/mocks" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" mock_ttime "github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime/mocks" "github.com/stretchr/testify/assert" diff --git a/agent/stats/common_test.go b/agent/stats/common_test.go index 2636b8e2bda..edc4e07ba65 100644 --- a/agent/stats/common_test.go +++ b/agent/stats/common_test.go @@ -24,8 +24,8 @@ import ( "github.com/aws/amazon-ecs-agent/agent/config" "github.com/aws/amazon-ecs-agent/agent/data" "github.com/aws/amazon-ecs-agent/agent/ecs_client/model/ecs" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/statechange" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs" "github.com/aws/aws-sdk-go/aws" diff --git a/agent/stats/engine.go b/agent/stats/engine.go index 2843afbc788..178a110aef9 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -36,8 +36,8 @@ import ( "github.com/aws/amazon-ecs-agent/agent/dockerclient" "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi" ecsengine "github.com/aws/amazon-ecs-agent/agent/engine" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/stats/resolver" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs" "github.com/aws/aws-sdk-go/aws" "github.com/docker/docker/api/types" diff --git a/agent/tcs/handler/handler.go b/agent/tcs/handler/handler.go index 1d378573380..97e9db5de50 100644 --- a/agent/tcs/handler/handler.go +++ b/agent/tcs/handler/handler.go @@ -22,9 +22,9 @@ import ( "github.com/aws/amazon-ecs-agent/agent/config" "github.com/aws/amazon-ecs-agent/agent/engine" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/version" "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" tcsclient "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/client" "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs" "github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry" diff --git a/agent/tcs/handler/handler_test.go b/agent/tcs/handler/handler_test.go index 0c3caa51b28..2365b0a793f 100644 --- a/agent/tcs/handler/handler_test.go +++ b/agent/tcs/handler/handler_test.go @@ -32,10 +32,10 @@ import ( mock_api "github.com/aws/amazon-ecs-agent/agent/api/mocks" "github.com/aws/amazon-ecs-agent/agent/config" mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/stats" "github.com/aws/amazon-ecs-agent/agent/version" "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" tcsclient "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/client" "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs" "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient" diff --git a/agent/tcs/handler/types.go b/agent/tcs/handler/types.go index 260dfae5167..2666f945611 100644 --- a/agent/tcs/handler/types.go +++ b/agent/tcs/handler/types.go @@ -20,9 +20,9 @@ import ( "github.com/aws/amazon-ecs-agent/agent/api" "github.com/aws/amazon-ecs-agent/agent/config" "github.com/aws/amazon-ecs-agent/agent/engine" - "github.com/aws/amazon-ecs-agent/agent/eventstream" "github.com/aws/amazon-ecs-agent/agent/stats" "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs" "github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime" "github.com/aws/aws-sdk-go/aws/credentials" diff --git a/agent/eventstream/eventstream.go b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/eventstream/eventstream.go similarity index 96% rename from agent/eventstream/eventstream.go rename to agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/eventstream/eventstream.go index 8dac2a665e4..90c13105912 100644 --- a/agent/eventstream/eventstream.go +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/eventstream/eventstream.go @@ -11,8 +11,6 @@ // express or implied. See the License for the specific language governing // permissions and limitations under the License. -// Package handler deals with appropriately reacting to all ACS messages as well -// as maintaining the connection to ACS. package eventstream import ( diff --git a/agent/vendor/modules.txt b/agent/vendor/modules.txt index 1bd7261da54..f3f99ee2615 100644 --- a/agent/vendor/modules.txt +++ b/agent/vendor/modules.txt @@ -18,6 +18,7 @@ github.com/aws/amazon-ecs-agent/ecs-agent/api/status github.com/aws/amazon-ecs-agent/ecs-agent/credentials github.com/aws/amazon-ecs-agent/ecs-agent/credentials/mocks github.com/aws/amazon-ecs-agent/ecs-agent/doctor +github.com/aws/amazon-ecs-agent/ecs-agent/eventstream github.com/aws/amazon-ecs-agent/ecs-agent/logger github.com/aws/amazon-ecs-agent/ecs-agent/logger/audit github.com/aws/amazon-ecs-agent/ecs-agent/logger/audit/mocks diff --git a/agent/wsclient/client.go b/agent/wsclient/client.go deleted file mode 100644 index b5541dba061..00000000000 --- a/agent/wsclient/client.go +++ /dev/null @@ -1,489 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -// Package wsclient wraps the generated aws-sdk-go client to provide marshalling -// and unmarshalling of data over a websocket connection in the format expected -// by backend. It allows for bidirectional communication and acts as both a -// client-and-server in terms of requests, but only as a client in terms of -// connecting. -package wsclient - -import ( - "context" - "crypto/tls" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "net" - "net/http" - "net/url" - "os" - "reflect" - "strings" - "sync" - "time" - - "github.com/aws/amazon-ecs-agent/agent/config" - "github.com/aws/amazon-ecs-agent/agent/wsclient/wsconn" - "github.com/aws/amazon-ecs-agent/ecs-agent/logger" - "github.com/aws/amazon-ecs-agent/ecs-agent/utils" - "github.com/aws/amazon-ecs-agent/ecs-agent/utils/cipher" - "github.com/aws/amazon-ecs-agent/ecs-agent/utils/httpproxy" - - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/private/protocol/json/jsonutil" - "github.com/cihub/seelog" - "github.com/gorilla/websocket" - "github.com/pkg/errors" -) - -const ( - // ServiceName defines the service name for the agent. This is used to sign messages - // that are sent to the backend. - ServiceName = "ecs" - - // wsConnectTimeout specifies the default connection timeout to the backend. - wsConnectTimeout = 30 * time.Second - - // wsHandshakeTimeout specifies the default handshake timeout for the websocket client - wsHandshakeTimeout = wsConnectTimeout - - // readBufSize is the size of the read buffer for the ws connection. - readBufSize = 4096 - - // writeBufSize is the size of the write buffer for the ws connection. - writeBufSize = 32768 - - // Default NO_PROXY env var IP addresses - defaultNoProxyIP = "169.254.169.254,169.254.170.2" - - errClosed = "use of closed network connection" -) - -// ReceivedMessage is the intermediate message used to unmarshal a -// message from backend -type ReceivedMessage struct { - Type string `json:"type"` - Message json.RawMessage `json:"message"` -} - -// RequestMessage is the intermediate message marshalled to send to backend. -type RequestMessage struct { - Type string `json:"type"` - Message json.RawMessage `json:"message"` -} - -// RequestHandler would be func(*ecsacs.T for T in ecsacs.*) to be more proper, but it needs -// to be interface{} to properly capture that -type RequestHandler interface{} - -// ClientServer is a combined client and server for the backend websocket connection -type ClientServer interface { - AddRequestHandler(RequestHandler) - // SetAnyRequestHandler takes a function with the signature 'func(i - // interface{})' and calls it with every message the server passes down. - // Only a single 'AnyRequestHandler' will be active at a given time for a - // ClientServer - SetAnyRequestHandler(RequestHandler) - MakeRequest(input interface{}) error - WriteMessage(input []byte) error - WriteCloseMessage() error - Connect() error - IsConnected() bool - SetConnection(conn wsconn.WebsocketConn) - Disconnect(...interface{}) error - Serve() error - SetReadDeadline(t time.Time) error - io.Closer -} - -// ClientServerImpl wraps commonly used methods defined in ClientServer interface. -type ClientServerImpl struct { - // AgentConfig is the user-specified runtime configuration - AgentConfig *config.Config - // conn holds the underlying low-level websocket connection - conn wsconn.WebsocketConn - // CredentialProvider is used to retrieve AWS credentials - CredentialProvider *credentials.Credentials - // RequestHandlers is a map from message types to handler functions of the - // form: - // "FooMessage": func(message *ecsacs.FooMessage) - RequestHandlers map[string]RequestHandler - // AnyRequestHandler is a request handler that, if set, is called on every - // message with said message. It will be called before a RequestHandler is - // called. It must take a single interface{} argument. - AnyRequestHandler RequestHandler - // MakeRequestHook is an optional callback that, if set, is called on every - // generated request with the raw request body. - MakeRequestHook MakeRequestHookFunc - // URL is the full url to the backend, including path, querystring, and so on. - URL string - // RWTimeout is the duration used for setting read and write deadlines - // for the websocket connection - RWTimeout time.Duration - // writeLock needed to ensure that only one routine is writing to the socket - writeLock sync.RWMutex - ClientServer - ServiceError - TypeDecoder -} - -// MakeRequestHookFunc is a function that is invoked on every generated request -// with the raw request body. MakeRequestHookFunc must return either the body -// to send or an error. -type MakeRequestHookFunc func([]byte) ([]byte, error) - -// Connect opens a connection to the backend and upgrades it to a websocket. Calls to -// 'MakeRequest' can be made after calling this, but responses will not be -// receivable until 'Serve' is also called. -func (cs *ClientServerImpl) Connect() error { - logger.Info("Establishing a Websocket connection", logger.Fields{ - "url": cs.URL, - }) - parsedURL, err := url.Parse(cs.URL) - if err != nil { - return err - } - - wsScheme, err := websocketScheme(parsedURL.Scheme) - if err != nil { - return err - } - parsedURL.Scheme = wsScheme - - // NewRequest never returns an error if the url parses and we just verified - // it did above - request, _ := http.NewRequest("GET", parsedURL.String(), nil) - - // Sign the request; we'll send its headers via the websocket client which includes the signature - err = utils.SignHTTPRequest(request, cs.AgentConfig.AWSRegion, ServiceName, cs.CredentialProvider, nil) - if err != nil { - return err - } - - timeoutDialer := &net.Dialer{Timeout: wsConnectTimeout} - tlsConfig := &tls.Config{ServerName: parsedURL.Host, InsecureSkipVerify: cs.AgentConfig.AcceptInsecureCert} - cipher.WithSupportedCipherSuites(tlsConfig) - - // Ensure that NO_PROXY gets set - noProxy := os.Getenv("NO_PROXY") - if noProxy == "" { - dockerHost, err := url.Parse(cs.AgentConfig.DockerEndpoint) - if err == nil { - dockerHost.Scheme = "" - os.Setenv("NO_PROXY", fmt.Sprintf("%s,%s", defaultNoProxyIP, dockerHost.String())) - seelog.Info("NO_PROXY set:", os.Getenv("NO_PROXY")) - } else { - seelog.Errorf("NO_PROXY unable to be set: the configured Docker endpoint is invalid.") - } - } - - dialer := websocket.Dialer{ - ReadBufferSize: readBufSize, - WriteBufferSize: writeBufSize, - TLSClientConfig: tlsConfig, - Proxy: httpproxy.Proxy, - NetDial: timeoutDialer.Dial, - HandshakeTimeout: wsHandshakeTimeout, - } - - websocketConn, httpResponse, err := dialer.Dial(parsedURL.String(), request.Header) - if httpResponse != nil { - defer httpResponse.Body.Close() - } - - if err != nil { - var resp []byte - if httpResponse != nil { - var readErr error - resp, readErr = ioutil.ReadAll(httpResponse.Body) - if readErr != nil { - return fmt.Errorf("Unable to read websocket connection: " + readErr.Error() + ", " + err.Error()) - } - // If there's a response, we can try to unmarshal it into one of the - // modeled error types - possibleError, _, decodeErr := DecodeData(resp, cs.TypeDecoder) - if decodeErr == nil { - return cs.NewError(possibleError) - } - } - seelog.Warnf("Error creating a websocket client: %v", err) - return errors.Wrapf(err, "websocket client: unable to dial %s response: %s", - parsedURL.Host, string(resp)) - } - - cs.writeLock.Lock() - defer cs.writeLock.Unlock() - - cs.conn = websocketConn - seelog.Debugf("Established a Websocket connection to %s", cs.URL) - return nil -} - -// IsReady gives a boolean response that informs the caller if the websocket -// connection is fully established. -func (cs *ClientServerImpl) IsReady() bool { - cs.writeLock.RLock() - defer cs.writeLock.RUnlock() - - return cs.conn != nil -} - -// SetConnection passes a websocket connection object into the client. This is used only in -// testing and should be avoided in non-test code. -func (cs *ClientServerImpl) SetConnection(conn wsconn.WebsocketConn) { - cs.conn = conn -} - -// SetReadDeadline sets the read deadline for the websocket connection -// A read timeout results in an io error if there are any outstanding reads -// that exceed the deadline -func (cs *ClientServerImpl) SetReadDeadline(t time.Time) error { - err := cs.conn.SetReadDeadline(t) - if err == nil { - return nil - } - seelog.Warnf("Unable to set read deadline for websocket connection: %v for %s", err, cs.URL) - // If we get connection closed error from SetReadDeadline, break out of the for loop and - // return an error - if opErr, ok := err.(*net.OpError); ok && strings.Contains(opErr.Err.Error(), errClosed) { - seelog.Errorf("Stopping redundant reads on closed network connection: %s", cs.URL) - return opErr - } - // An unhandled error has occurred while trying to extend read deadline. - // Try asynchronously closing the connection. We don't want to be blocked on stale connections - // taking too long to close. The flip side is that we might start accumulating stale connections. - // But, that still seems more desirable than waiting for ever for the connection to close - cs.forceCloseConnection() - return err -} - -func (cs *ClientServerImpl) forceCloseConnection() { - closeChan := make(chan error, 1) - go func() { - closeChan <- cs.Close() - }() - ctx, cancel := context.WithTimeout(context.TODO(), wsConnectTimeout) - defer cancel() - select { - case closeErr := <-closeChan: - if closeErr != nil { - seelog.Warnf("Unable to close websocket connection: %v for %s", - closeErr, cs.URL) - } - case <-ctx.Done(): - if ctx.Err() != nil { - seelog.Warnf("Context canceled waiting for termination of websocket connection: %v for %s", - ctx.Err(), cs.URL) - } - } -} - -// Disconnect disconnects the connection -func (cs *ClientServerImpl) Disconnect(...interface{}) error { - cs.writeLock.Lock() - defer cs.writeLock.Unlock() - - if cs.conn == nil { - return fmt.Errorf("websocker client: no connection to close") - } - - // Close() in turn results in a an internal flushFrame() call in gorilla - // as the close frame needs to be sent to the server. Set the deadline - // for that as well. - if err := cs.conn.SetWriteDeadline(time.Now().Add(cs.RWTimeout)); err != nil { - seelog.Warnf("Unable to set write deadline for websocket connection: %v for %s", err, cs.URL) - } - return cs.conn.Close() -} - -// AddRequestHandler adds a request handler to this client. -// A request handler *must* be a function taking a single argument, and that -// argument *must* be a pointer to a recognized 'ecsacs' struct. -// E.g. if you desired to handle messages from acs of type 'FooMessage', you -// would pass the following handler in: -// -// func(message *ecsacs.FooMessage) -// -// This function will panic if the passed in function does not have one pointer -// argument or the argument is not a recognized type. -// Additionally, the request handler will block processing of further messages -// on this connection so it's important that it return quickly. -func (cs *ClientServerImpl) AddRequestHandler(f RequestHandler) { - firstArg := reflect.TypeOf(f).In(0) - firstArgTypeStr := firstArg.Elem().Name() - recognizedTypes := cs.GetRecognizedTypes() - _, ok := recognizedTypes[firstArgTypeStr] - if !ok { - panic("AddRequestHandler called with invalid function; argument type not recognized: " + firstArgTypeStr) - } - cs.RequestHandlers[firstArgTypeStr] = f -} - -// SetAnyRequestHandler passes a RequestHandler object into the client. -func (cs *ClientServerImpl) SetAnyRequestHandler(f RequestHandler) { - cs.AnyRequestHandler = f -} - -// MakeRequest makes a request using the given input. Note, the input *MUST* be -// a pointer to a valid backend type that this client recognises -func (cs *ClientServerImpl) MakeRequest(input interface{}) error { - send, err := cs.CreateRequestMessage(input) - if err != nil { - return err - } - - if cs.MakeRequestHook != nil { - send, err = cs.MakeRequestHook(send) - if err != nil { - return err - } - } - - // Over the wire we send something like - // {"type":"AckRequest","message":{"messageId":"xyz"}} - return cs.WriteMessage(send) -} - -// WriteMessage wraps the low level websocket write method with a lock -func (cs *ClientServerImpl) WriteMessage(send []byte) error { - cs.writeLock.Lock() - defer cs.writeLock.Unlock() - - // This is just future proofing. Ignore the error as the gorilla websocket - // library returns 'nil' anyway for SetWriteDeadline - // https://github.com/gorilla/websocket/blob/4201258b820c74ac8e6922fc9e6b52f71fe46f8d/conn.go#L761 - if err := cs.conn.SetWriteDeadline(time.Now().Add(cs.RWTimeout)); err != nil { - seelog.Warnf("Unable to set write deadline for websocket connection: %v for %s", err, cs.URL) - } - - return cs.conn.WriteMessage(websocket.TextMessage, send) -} - -// WriteCloseMessage wraps the low level websocket WriteControl method with a lock, and sends a message of type -// CloseMessage (Ref: https://github.com/gorilla/websocket/blob/9111bb834a68b893cebbbaed5060bdbc1d9ab7d2/conn.go#L74) -func (cs *ClientServerImpl) WriteCloseMessage() error { - cs.writeLock.Lock() - defer cs.writeLock.Unlock() - - send := websocket.FormatCloseMessage(websocket.CloseNormalClosure, - "ConnectionExpired: Reconnect to continue") - - return cs.conn.WriteControl(websocket.CloseMessage, send, time.Now().Add(cs.RWTimeout)) -} - -// ConsumeMessages reads messages from the websocket connection and handles read -// messages from an active connection. -func (cs *ClientServerImpl) ConsumeMessages() error { - for { - if err := cs.SetReadDeadline(time.Now().Add(cs.RWTimeout)); err != nil { - return err - } - messageType, message, err := cs.conn.ReadMessage() - - switch { - case err == nil: - if messageType != websocket.TextMessage { - // maybe not fatal though, we'll try to process it anyways - seelog.Errorf("Unexpected messageType: %v", messageType) - } - cs.handleMessage(message) - - case permissibleCloseCode(err): - seelog.Debugf("Connection closed for a valid reason: %s", err) - return io.EOF - - default: - // Unexpected error occurred - seelog.Debugf("Error getting message from ws backend: error: [%v], messageType: [%v] ", - err, messageType) - return err - } - - } -} - -// CreateRequestMessage creates the request json message using the given input. -// Note, the input *MUST* be a pointer to a valid backend type that this -// client recognises. -func (cs *ClientServerImpl) CreateRequestMessage(input interface{}) ([]byte, error) { - msg := &RequestMessage{} - - recognizedTypes := cs.GetRecognizedTypes() - for typeStr, typeVal := range recognizedTypes { - if reflect.TypeOf(input) == reflect.PtrTo(typeVal) { - msg.Type = typeStr - break - } - } - if msg.Type == "" { - return nil, &UnrecognizedWSRequestType{reflect.TypeOf(input).String()} - } - messageData, err := jsonutil.BuildJSON(input) - if err != nil { - return nil, &NotMarshallableWSRequest{msg.Type, err} - } - msg.Message = json.RawMessage(messageData) - - send, err := json.Marshal(msg) - if err != nil { - return nil, &NotMarshallableWSRequest{msg.Type, err} - } - return send, nil -} - -// handleMessage dispatches a message to the correct 'requestHandler' for its -// type. If no request handler is found, the message is discarded. -func (cs *ClientServerImpl) handleMessage(data []byte) { - typedMessage, typeStr, err := DecodeData(data, cs.TypeDecoder) - if err != nil { - seelog.Warnf("Unable to handle message from backend: %v", err) - return - } - - seelog.Debugf("Received message of type: %s", typeStr) - - if cs.AnyRequestHandler != nil { - reflect.ValueOf(cs.AnyRequestHandler).Call([]reflect.Value{reflect.ValueOf(typedMessage)}) - } - - if handler, ok := cs.RequestHandlers[typeStr]; ok { - reflect.ValueOf(handler).Call([]reflect.Value{reflect.ValueOf(typedMessage)}) - } else { - seelog.Infof("No handler for message type: %s %s", typeStr, typedMessage) - } -} - -func websocketScheme(httpScheme string) (string, error) { - // gorilla/websocket expects the websocket scheme (ws[s]://) - var wsScheme string - switch httpScheme { - case "http": - wsScheme = "ws" - case "https": - wsScheme = "wss" - default: - return "", fmt.Errorf("wsclient: unknown scheme %s", httpScheme) - } - return wsScheme, nil -} - -// See https://github.com/gorilla/websocket/blob/87f6f6a22ebfbc3f89b9ccdc7fddd1b914c095f9/conn.go#L650 -func permissibleCloseCode(err error) bool { - return websocket.IsCloseError(err, - websocket.CloseNormalClosure, // websocket error code 1000 - websocket.CloseAbnormalClosure, // websocket error code 1006 - websocket.CloseGoingAway, // websocket error code 1001 - websocket.CloseInternalServerErr) // websocket error code 1011 -} diff --git a/agent/wsclient/client_test.go b/agent/wsclient/client_test.go deleted file mode 100644 index 428d4474fa9..00000000000 --- a/agent/wsclient/client_test.go +++ /dev/null @@ -1,282 +0,0 @@ -//go:build unit -// +build unit - -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package wsclient - -import ( - "errors" - "io" - "net" - "net/url" - "os" - "strings" - "sync" - "testing" - "time" - - "github.com/aws/amazon-ecs-agent/agent/config" - "github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs" - "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient/mock/utils" - mock_wsconn "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient/wsconn/mock" - "github.com/golang/mock/gomock" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - - "github.com/gorilla/websocket" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const dockerEndpoint = "/var/run/docker.sock" - -// Close closes the underlying connection. Implement Close() in this file -// as ClientServerImpl doesn't implement it. This is needed by the -// TestSetReadDeadline* tests -func (cs *ClientServerImpl) Close() error { - return cs.Disconnect() -} - -func TestClientProxy(t *testing.T) { - proxy_url := "127.0.0.1:1234" - os.Setenv("HTTP_PROXY", proxy_url) - defer os.Unsetenv("HTTP_PROXY") - - cs := getClientServer("http://www.amazon.com") - err := cs.Connect() - assert.Error(t, err) - assert.True(t, strings.Contains(err.Error(), proxy_url), "proxy not found: %s", err.Error()) -} - -// TestConcurrentWritesDontPanic will force a panic in the websocket library if -// the implemented methods don't handle concurrency correctly -// See https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency -func TestConcurrentWritesDontPanic(t *testing.T) { - closeWS := make(chan []byte) - defer close(closeWS) - - mockServer, _, requests, _, _ := utils.GetMockServer(closeWS) - mockServer.StartTLS() - defer mockServer.Close() - - var waitForRequests sync.WaitGroup - waitForRequests.Add(1) - - go func() { - for i := 0; i < 20; i++ { - <-requests - } - waitForRequests.Done() - }() - req := ecsacs.AckRequest{Cluster: aws.String("test"), ContainerInstance: aws.String("test"), MessageId: aws.String("test")} - - cs := getClientServer(mockServer.URL) - require.NoError(t, cs.Connect()) - - executeTenRequests := func() { - for i := 0; i < 10; i++ { - assert.NoError(t, cs.MakeRequest(&req)) - } - } - - // Make requests from two separate routines to try and force a - // concurrent write - go executeTenRequests() - go executeTenRequests() - - t.Log("Waiting for all 20 requests to succeed") - waitForRequests.Wait() -} - -func getClientServer(url string) *ClientServerImpl { - types := []interface{}{ecsacs.AckRequest{}} - testCreds := credentials.NewStaticCredentials("test-id", "test-secret", "test-token") - - return &ClientServerImpl{ - URL: url, - AgentConfig: &config.Config{ - AcceptInsecureCert: true, - AWSRegion: "us-east-1", - DockerEndpoint: "unix://" + dockerEndpoint, - }, - CredentialProvider: testCreds, - TypeDecoder: BuildTypeDecoder(types), - RWTimeout: time.Second, - } -} - -// TestProxyVariableCustomValue ensures that a user is able to override the -// proxy variable by setting an environment variable -func TestProxyVariableCustomValue(t *testing.T) { - closeWS := make(chan []byte) - defer close(closeWS) - - mockServer, _, _, _, _ := utils.GetMockServer(closeWS) - mockServer.StartTLS() - defer mockServer.Close() - - testString := "Custom no proxy string" - os.Setenv("NO_PROXY", testString) - require.NoError(t, getClientServer(mockServer.URL).Connect()) - - assert.Equal(t, os.Getenv("NO_PROXY"), testString, "NO_PROXY should match user-supplied variable") -} - -// TestProxyVariableDefaultValue verifies that NO_PROXY gets overridden if it -// isn't already set -func TestProxyVariableDefaultValue(t *testing.T) { - closeWS := make(chan []byte) - defer close(closeWS) - - mockServer, _, _, _, _ := utils.GetMockServer(closeWS) - mockServer.StartTLS() - defer mockServer.Close() - - os.Unsetenv("NO_PROXY") - getClientServer(mockServer.URL).Connect() - - expectedEnvVar := "169.254.169.254,169.254.170.2," + dockerEndpoint - - assert.Equal(t, os.Getenv("NO_PROXY"), expectedEnvVar, "Variable NO_PROXY expected to be overwritten when no default value supplied") -} - -// TestHandleMessagePermissibleCloseCode ensures that permissible close codes -// are wrapped in io.EOF -func TestHandleMessagePermissibleCloseCode(t *testing.T) { - closeWS := make(chan []byte) - defer close(closeWS) - - messageError := make(chan error) - mockServer, _, _, _, _ := utils.GetMockServer(closeWS) - mockServer.StartTLS() - cs := getClientServer(mockServer.URL) - require.NoError(t, cs.Connect()) - - go func() { - messageError <- cs.ConsumeMessages() - }() - - closeWS <- websocket.FormatCloseMessage(websocket.CloseNormalClosure, ":)") - assert.EqualError(t, <-messageError, io.EOF.Error(), "expected EOF for normal close code") -} - -// TestHandleMessageUnexpectedCloseCode checks that unexpected close codes will -// be returned as is (not wrapped in io.EOF) -func TestHandleMessageUnexpectedCloseCode(t *testing.T) { - closeWS := make(chan []byte) - defer close(closeWS) - - messageError := make(chan error) - mockServer, _, _, _, _ := utils.GetMockServer(closeWS) - mockServer.StartTLS() - cs := getClientServer(mockServer.URL) - require.NoError(t, cs.Connect()) - - go func() { - messageError <- cs.ConsumeMessages() - }() - - closeWS <- websocket.FormatCloseMessage(websocket.CloseTryAgainLater, ":(") - assert.True(t, websocket.IsCloseError(<-messageError, websocket.CloseTryAgainLater), "Expected error from websocket library") -} - -// TestHandlNonHTTPSEndpoint verifies that the wsclient can handle communication over -// an HTTP (so WS) connection -func TestHandleNonHTTPSEndpoint(t *testing.T) { - closeWS := make(chan []byte) - defer close(closeWS) - - mockServer, _, requests, _, _ := utils.GetMockServer(closeWS) - mockServer.Start() - defer mockServer.Close() - - cs := getClientServer(mockServer.URL) - require.NoError(t, cs.Connect()) - - req := ecsacs.AckRequest{Cluster: aws.String("test"), ContainerInstance: aws.String("test"), MessageId: aws.String("test")} - cs.MakeRequest(&req) - - t.Log("Waiting for single request to be visible server-side") - <-requests -} - -// TestHandleIncorrectHttpScheme checks that an incorrect URL scheme results in -// an error -func TestHandleIncorrectURLScheme(t *testing.T) { - closeWS := make(chan []byte) - defer close(closeWS) - - mockServer, _, _, _, _ := utils.GetMockServer(closeWS) - mockServer.StartTLS() - defer mockServer.Close() - - mockServerURL, _ := url.Parse(mockServer.URL) - mockServerURL.Scheme = "notaparticularlyrealscheme" - - cs := getClientServer(mockServerURL.String()) - err := cs.Connect() - - assert.Error(t, err, "Expected error for incorrect URL scheme") -} - -// TestWebsocketScheme checks that websocketScheme handles valid and invalid mappings -// correctly -func TestWebsocketScheme(t *testing.T) { - // test valid schemes - validMappings := map[string]string{ - "http": "ws", - "https": "wss", - } - - for input, expectedOutput := range validMappings { - actualOutput, err := websocketScheme(input) - - assert.NoError(t, err, "Unexpected error for valid http scheme") - assert.Equal(t, actualOutput, expectedOutput, "Valid http schemes should map to a websocket scheme") - } - - // test an invalid mapping - _, err := websocketScheme("highly-likely-to-be-junk") - assert.Error(t, err, "Expected error for invalid http scheme") -} - -func TestSetReadDeadlineClosedConnection(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - conn := mock_wsconn.NewMockWebsocketConn(ctrl) - cs := &ClientServerImpl{conn: conn} - - opErr := &net.OpError{Err: errors.New(errClosed)} - conn.EXPECT().SetReadDeadline(gomock.Any()).Return(opErr) - assert.EqualError(t, cs.ConsumeMessages(), opErr.Error()) -} - -func TestSetReadDeadlineError(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - conn := mock_wsconn.NewMockWebsocketConn(ctrl) - cs := &ClientServerImpl{conn: conn} - - gomock.InOrder( - conn.EXPECT().SetReadDeadline(gomock.Any()).Return(errors.New("error")), - conn.EXPECT().SetWriteDeadline(gomock.Any()).Return(nil), - conn.EXPECT().Close().Return(nil), - ) - assert.Error(t, cs.ConsumeMessages()) -} diff --git a/agent/wsclient/decode.go b/agent/wsclient/decode.go deleted file mode 100644 index b65f827452f..00000000000 --- a/agent/wsclient/decode.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -// Package wsclient wraps the generated aws-sdk-go client to provide marshalling -// and unmarshalling of data over a websocket connection in the format expected -// by backend. It allows for bidirectional communication and acts as both a -// client-and-server in terms of requests, but only as a client in terms of -// connecting. -package wsclient - -import ( - "bytes" - "encoding/json" - "reflect" - - "github.com/aws/aws-sdk-go/private/protocol/json/jsonutil" -) - -// DecodeData decodes a raw message into its type. E.g. An ACS message of the -// form {"type":"FooMessage","message":{"foo":1}} will be decoded into the -// corresponding *ecsacs.FooMessage type. The type string, "FooMessage", will -// also be returned as a convenience. -func DecodeData(data []byte, dec TypeDecoder) (interface{}, string, error) { - raw := &ReceivedMessage{} - // Delay unmarshal until we know the type - err := json.Unmarshal(data, raw) - if err != nil || raw.Type == "" { - // Unframed messages can be of the {"Type":"Message"} form as well, try - // that. - connErr, connErrType, decodeErr := DecodeConnectionError(data, dec) - if decodeErr == nil && connErrType != "" { - return connErr, connErrType, nil - } - return nil, "", decodeErr - } - - reqMessage, ok := dec.NewOfType(raw.Type) - if !ok { - return nil, raw.Type, &UnrecognizedWSRequestType{raw.Type} - } - err = jsonutil.UnmarshalJSON(reqMessage, bytes.NewReader(raw.Message)) - return reqMessage, raw.Type, err -} - -// DecodeConnectionError decodes some of the connection errors returned by the -// backend. Some differ from the usual ones in that they do not have a 'type' -// and 'message' field, but rather are of the form {"ErrorType":"ErrorMessage"} -func DecodeConnectionError(data []byte, dec TypeDecoder) (interface{}, string, error) { - var acsErr map[string]string - err := json.Unmarshal(data, &acsErr) - if err != nil { - return nil, "", &UndecodableMessage{string(data)} - } - if len(acsErr) != 1 { - return nil, "", &UndecodableMessage{string(data)} - } - var typeStr string - for key := range acsErr { - typeStr = key - } - errType, ok := dec.NewOfType(typeStr) - if !ok { - return nil, typeStr, &UnrecognizedWSRequestType{} - } - - val := reflect.ValueOf(errType) - if val.Kind() != reflect.Ptr { - return nil, typeStr, &UnrecognizedWSRequestType{"Non-pointer kind: " + val.Kind().String()} - } - ret := reflect.New(val.Elem().Type()) - retObj := ret.Elem() - - if retObj.Kind() != reflect.Struct { - return nil, typeStr, &UnrecognizedWSRequestType{"Pointer to non-struct kind: " + retObj.Kind().String()} - } - - msgField := retObj.FieldByName("Message_") - if !msgField.IsValid() { - return nil, typeStr, &UnrecognizedWSRequestType{"Expected error type to have 'Message' field"} - } - if msgField.IsValid() && msgField.CanSet() { - msgStr := acsErr[typeStr] - msgStrVal := reflect.ValueOf(&msgStr) - if !msgStrVal.Type().AssignableTo(msgField.Type()) { - return nil, typeStr, &UnrecognizedWSRequestType{"Type mismatch; 'Message' field must be a *string"} - } - msgField.Set(msgStrVal) - return ret.Interface(), typeStr, nil - } - return nil, typeStr, &UnrecognizedWSRequestType{"Invalid message field; must not be nil"} -} diff --git a/agent/wsclient/error.go b/agent/wsclient/error.go deleted file mode 100644 index d89cade8967..00000000000 --- a/agent/wsclient/error.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package wsclient - -import "reflect" - -// WSUnretriableErrors defines methods to retrieve the list of unretriable -// errors. -type WSUnretriableErrors interface { - Get() []interface{} -} - -// ServiceError defines methods to return new backend specific error objects. -type ServiceError interface { - NewError(err interface{}) *WSError -} - -// WSError wraps all the typed errors that the backend may return -// This will not be needed once the aws-sdk-go generation handles error types -// more cleanly -type WSError struct { - ErrObj interface{} - Type string - WSUnretriableErrors -} - -// Error returns an error string -func (err *WSError) Error() string { - val := reflect.ValueOf(err.ErrObj) - if val.Kind() == reflect.Ptr { - val = val.Elem() - } - var typeStr = "Unknown type" - if val.IsValid() { - typeStr = val.Type().Name() - msg := val.FieldByName("Message_") - if msg.IsValid() && msg.CanInterface() { - str, ok := msg.Interface().(*string) - if ok { - if str == nil { - return typeStr + ": null" - } - return typeStr + ": " + *str - } - } - } - - if asErr, ok := err.ErrObj.(error); ok { - return err.Type + ": " + asErr.Error() - } - return err.Type + ": Unknown error (" + typeStr + ")" -} - -// Retry returns true if this error should be considered retriable -func (err *WSError) Retry() bool { - for _, unretriable := range err.Get() { - if reflect.TypeOf(err.ErrObj) == reflect.TypeOf(unretriable) { - return false - } - } - return true -} diff --git a/agent/wsclient/errors.go b/agent/wsclient/errors.go deleted file mode 100644 index e86e342d0ac..00000000000 --- a/agent/wsclient/errors.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package wsclient - -// UnrecognizedWSRequestType specifies that a given type is not recognized. -// This error is not retriable. -type UnrecognizedWSRequestType struct { - Type string -} - -// Error implements error -func (u *UnrecognizedWSRequestType) Error() string { - return "Could not recognize given argument as a valid type: " + u.Type -} - -// Retry implements Retriable -func (u *UnrecognizedWSRequestType) Retry() bool { - return false -} - -// NotMarshallableWSRequest represents that the given request input could not be -// marshalled -type NotMarshallableWSRequest struct { - Type string - - Err error -} - -// Retry implements Retriable -func (u *NotMarshallableWSRequest) Retry() bool { - return false -} - -// Error implements error -func (u *NotMarshallableWSRequest) Error() string { - ret := "Could not marshal Request" - if u.Type != "" { - ret += " (" + u.Type + ")" - } - return ret + ": " + u.Err.Error() -} - -// UndecodableMessage indicates that a message from the backend could not be decoded -type UndecodableMessage struct { - Msg string -} - -func (u *UndecodableMessage) Error() string { - return "Could not decode message into any expected format: " + u.Msg -} diff --git a/agent/wsclient/generate_mocks.go b/agent/wsclient/generate_mocks.go deleted file mode 100644 index da333df744b..00000000000 --- a/agent/wsclient/generate_mocks.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package wsclient - -//go:generate mockgen -destination=mock/client.go -copyright_file=../../scripts/copyright_file github.com/aws/amazon-ecs-agent/agent/wsclient ClientServer diff --git a/agent/wsclient/mock/client.go b/agent/wsclient/mock/client.go deleted file mode 100644 index 9d8dcba7ad4..00000000000 --- a/agent/wsclient/mock/client.go +++ /dev/null @@ -1,217 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. -// - -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/aws/amazon-ecs-agent/agent/wsclient (interfaces: ClientServer) - -// Package mock_wsclient is a generated GoMock package. -package mock_wsclient - -import ( - reflect "reflect" - time "time" - - wsclient "github.com/aws/amazon-ecs-agent/agent/wsclient" - wsconn "github.com/aws/amazon-ecs-agent/agent/wsclient/wsconn" - gomock "github.com/golang/mock/gomock" -) - -// MockClientServer is a mock of ClientServer interface. -type MockClientServer struct { - ctrl *gomock.Controller - recorder *MockClientServerMockRecorder -} - -// MockClientServerMockRecorder is the mock recorder for MockClientServer. -type MockClientServerMockRecorder struct { - mock *MockClientServer -} - -// NewMockClientServer creates a new mock instance. -func NewMockClientServer(ctrl *gomock.Controller) *MockClientServer { - mock := &MockClientServer{ctrl: ctrl} - mock.recorder = &MockClientServerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockClientServer) EXPECT() *MockClientServerMockRecorder { - return m.recorder -} - -// AddRequestHandler mocks base method. -func (m *MockClientServer) AddRequestHandler(arg0 wsclient.RequestHandler) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "AddRequestHandler", arg0) -} - -// AddRequestHandler indicates an expected call of AddRequestHandler. -func (mr *MockClientServerMockRecorder) AddRequestHandler(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRequestHandler", reflect.TypeOf((*MockClientServer)(nil).AddRequestHandler), arg0) -} - -// Close mocks base method. -func (m *MockClientServer) Close() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") - ret0, _ := ret[0].(error) - return ret0 -} - -// Close indicates an expected call of Close. -func (mr *MockClientServerMockRecorder) Close() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockClientServer)(nil).Close)) -} - -// Connect mocks base method. -func (m *MockClientServer) Connect() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Connect") - ret0, _ := ret[0].(error) - return ret0 -} - -// Connect indicates an expected call of Connect. -func (mr *MockClientServerMockRecorder) Connect() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connect", reflect.TypeOf((*MockClientServer)(nil).Connect)) -} - -// Disconnect mocks base method. -func (m *MockClientServer) Disconnect(arg0 ...interface{}) error { - m.ctrl.T.Helper() - varargs := []interface{}{} - for _, a := range arg0 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "Disconnect", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// Disconnect indicates an expected call of Disconnect. -func (mr *MockClientServerMockRecorder) Disconnect(arg0 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Disconnect", reflect.TypeOf((*MockClientServer)(nil).Disconnect), arg0...) -} - -// IsConnected mocks base method. -func (m *MockClientServer) IsConnected() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsConnected") - ret0, _ := ret[0].(bool) - return ret0 -} - -// IsConnected indicates an expected call of IsConnected. -func (mr *MockClientServerMockRecorder) IsConnected() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsConnected", reflect.TypeOf((*MockClientServer)(nil).IsConnected)) -} - -// MakeRequest mocks base method. -func (m *MockClientServer) MakeRequest(arg0 interface{}) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MakeRequest", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// MakeRequest indicates an expected call of MakeRequest. -func (mr *MockClientServerMockRecorder) MakeRequest(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MakeRequest", reflect.TypeOf((*MockClientServer)(nil).MakeRequest), arg0) -} - -// Serve mocks base method. -func (m *MockClientServer) Serve() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Serve") - ret0, _ := ret[0].(error) - return ret0 -} - -// Serve indicates an expected call of Serve. -func (mr *MockClientServerMockRecorder) Serve() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Serve", reflect.TypeOf((*MockClientServer)(nil).Serve)) -} - -// SetAnyRequestHandler mocks base method. -func (m *MockClientServer) SetAnyRequestHandler(arg0 wsclient.RequestHandler) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetAnyRequestHandler", arg0) -} - -// SetAnyRequestHandler indicates an expected call of SetAnyRequestHandler. -func (mr *MockClientServerMockRecorder) SetAnyRequestHandler(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAnyRequestHandler", reflect.TypeOf((*MockClientServer)(nil).SetAnyRequestHandler), arg0) -} - -// SetConnection mocks base method. -func (m *MockClientServer) SetConnection(arg0 wsconn.WebsocketConn) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetConnection", arg0) -} - -// SetConnection indicates an expected call of SetConnection. -func (mr *MockClientServerMockRecorder) SetConnection(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetConnection", reflect.TypeOf((*MockClientServer)(nil).SetConnection), arg0) -} - -// SetReadDeadline mocks base method. -func (m *MockClientServer) SetReadDeadline(arg0 time.Time) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetReadDeadline", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SetReadDeadline indicates an expected call of SetReadDeadline. -func (mr *MockClientServerMockRecorder) SetReadDeadline(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetReadDeadline", reflect.TypeOf((*MockClientServer)(nil).SetReadDeadline), arg0) -} - -// WriteCloseMessage mocks base method. -func (m *MockClientServer) WriteCloseMessage() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WriteCloseMessage") - ret0, _ := ret[0].(error) - return ret0 -} - -// WriteCloseMessage indicates an expected call of WriteCloseMessage. -func (mr *MockClientServerMockRecorder) WriteCloseMessage() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteCloseMessage", reflect.TypeOf((*MockClientServer)(nil).WriteCloseMessage)) -} - -// WriteMessage mocks base method. -func (m *MockClientServer) WriteMessage(arg0 []byte) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WriteMessage", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// WriteMessage indicates an expected call of WriteMessage. -func (mr *MockClientServerMockRecorder) WriteMessage(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteMessage", reflect.TypeOf((*MockClientServer)(nil).WriteMessage), arg0) -} diff --git a/agent/wsclient/mock/utils/utils.go b/agent/wsclient/mock/utils/utils.go deleted file mode 100644 index 59f12d5b971..00000000000 --- a/agent/wsclient/mock/utils/utils.go +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package utils - -import ( - "net/http" - "net/http/httptest" - "time" - - "github.com/gorilla/websocket" -) - -// GetMockServer returns a mock websocket server that can be started up as TLS or not. -// TODO replace with gomock -func GetMockServer(closeWS <-chan []byte) (*httptest.Server, chan<- string, <-chan string, <-chan error, error) { - serverChan := make(chan string) - requestsChan := make(chan string) - errChan := make(chan error) - stopListen := make(chan bool) - - upgrader := websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024} - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ws, err := upgrader.Upgrade(w, r, nil) - go func() { - ws.WriteControl(websocket.CloseMessage, <-closeWS, time.Now().Add(time.Second)) - close(stopListen) - }() - if err != nil { - errChan <- err - } - go func() { - for { - select { - case <-stopListen: - return - default: - _, msg, err := ws.ReadMessage() - if err != nil { - errChan <- err - } else { - requestsChan <- string(msg) - } - } - } - }() - for str := range serverChan { - err := ws.WriteMessage(websocket.TextMessage, []byte(str)) - if err != nil { - errChan <- err - } - } - }) - - server := httptest.NewUnstartedServer(handler) - return server, serverChan, requestsChan, errChan, nil -} diff --git a/agent/wsclient/types.go b/agent/wsclient/types.go deleted file mode 100644 index 5bb29606792..00000000000 --- a/agent/wsclient/types.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package wsclient - -import "reflect" - -// TypeDecoder interface defines methods to decode ecs types. -type TypeDecoder interface { - // NewOfType returns an object of a recognized type for a given type name. - // It additionally returns a boolean value which is set to false for an - // unrecognized type. - NewOfType(string) (interface{}, bool) - - // GetRecognizedTypes returns a map of type-strings (as passed in acs/tcs messages as - // the 'type' field) to a pointer to the corresponding struct type this type should - // be marshalled/unmarshalled to/from. - GetRecognizedTypes() map[string]reflect.Type -} - -// TypeDecoderImpl is an implementation for general use between ACS and -// TCS clients -type TypeDecoderImpl struct { - typeMappings map[string]reflect.Type -} - -// BuildTypeDecoder takes a list of interfaces and stores them internally as a -// list of typeMappings in the format below. -// "MyMessage": TypeOf(ecstcs.MyMessage) -func BuildTypeDecoder(recognizedTypes []interface{}) TypeDecoder { - typeMappings := make(map[string]reflect.Type) - for _, recognizedType := range recognizedTypes { - typeMappings[reflect.TypeOf(recognizedType).Name()] = reflect.TypeOf(recognizedType) - } - - return &TypeDecoderImpl{typeMappings: typeMappings} -} - -func (d *TypeDecoderImpl) NewOfType(typeString string) (interface{}, bool) { - rtype, ok := d.typeMappings[typeString] - if !ok { - return nil, false - } - return reflect.New(rtype).Interface(), true -} - -func (d *TypeDecoderImpl) GetRecognizedTypes() map[string]reflect.Type { - return d.typeMappings -} diff --git a/agent/wsclient/wsconn/conn.go b/agent/wsclient/wsconn/conn.go deleted file mode 100644 index af29a55c93c..00000000000 --- a/agent/wsclient/wsconn/conn.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package wsconn - -import "time" - -// WebsocketConn specifies the subset of gorilla/websocket's -// connection's methods that this client uses. -type WebsocketConn interface { - WriteMessage(messageType int, data []byte) error - WriteControl(messageType int, data []byte, deadline time.Time) error - ReadMessage() (messageType int, data []byte, err error) - Close() error - SetWriteDeadline(t time.Time) error - SetReadDeadline(t time.Time) error -} diff --git a/agent/wsclient/wsconn/generate_mocks.go b/agent/wsclient/wsconn/generate_mocks.go deleted file mode 100644 index 7601fee242d..00000000000 --- a/agent/wsclient/wsconn/generate_mocks.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package wsconn - -//go:generate mockgen -destination=mock/conn.go -copyright_file=../../../scripts/copyright_file github.com/aws/amazon-ecs-agent/agent/wsclient/wsconn WebsocketConn diff --git a/agent/wsclient/wsconn/mock/conn.go b/agent/wsclient/wsconn/mock/conn.go deleted file mode 100644 index b0c56bb4a65..00000000000 --- a/agent/wsclient/wsconn/mock/conn.go +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. -// - -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/aws/amazon-ecs-agent/agent/wsclient/wsconn (interfaces: WebsocketConn) - -// Package mock_wsconn is a generated GoMock package. -package mock_wsconn - -import ( - reflect "reflect" - time "time" - - gomock "github.com/golang/mock/gomock" -) - -// MockWebsocketConn is a mock of WebsocketConn interface. -type MockWebsocketConn struct { - ctrl *gomock.Controller - recorder *MockWebsocketConnMockRecorder -} - -// MockWebsocketConnMockRecorder is the mock recorder for MockWebsocketConn. -type MockWebsocketConnMockRecorder struct { - mock *MockWebsocketConn -} - -// NewMockWebsocketConn creates a new mock instance. -func NewMockWebsocketConn(ctrl *gomock.Controller) *MockWebsocketConn { - mock := &MockWebsocketConn{ctrl: ctrl} - mock.recorder = &MockWebsocketConnMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockWebsocketConn) EXPECT() *MockWebsocketConnMockRecorder { - return m.recorder -} - -// Close mocks base method. -func (m *MockWebsocketConn) Close() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") - ret0, _ := ret[0].(error) - return ret0 -} - -// Close indicates an expected call of Close. -func (mr *MockWebsocketConnMockRecorder) Close() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockWebsocketConn)(nil).Close)) -} - -// ReadMessage mocks base method. -func (m *MockWebsocketConn) ReadMessage() (int, []byte, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReadMessage") - ret0, _ := ret[0].(int) - ret1, _ := ret[1].([]byte) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 -} - -// ReadMessage indicates an expected call of ReadMessage. -func (mr *MockWebsocketConnMockRecorder) ReadMessage() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadMessage", reflect.TypeOf((*MockWebsocketConn)(nil).ReadMessage)) -} - -// SetReadDeadline mocks base method. -func (m *MockWebsocketConn) SetReadDeadline(arg0 time.Time) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetReadDeadline", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SetReadDeadline indicates an expected call of SetReadDeadline. -func (mr *MockWebsocketConnMockRecorder) SetReadDeadline(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetReadDeadline", reflect.TypeOf((*MockWebsocketConn)(nil).SetReadDeadline), arg0) -} - -// SetWriteDeadline mocks base method. -func (m *MockWebsocketConn) SetWriteDeadline(arg0 time.Time) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetWriteDeadline", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SetWriteDeadline indicates an expected call of SetWriteDeadline. -func (mr *MockWebsocketConnMockRecorder) SetWriteDeadline(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWriteDeadline", reflect.TypeOf((*MockWebsocketConn)(nil).SetWriteDeadline), arg0) -} - -// WriteControl mocks base method. -func (m *MockWebsocketConn) WriteControl(arg0 int, arg1 []byte, arg2 time.Time) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WriteControl", arg0, arg1, arg2) - ret0, _ := ret[0].(error) - return ret0 -} - -// WriteControl indicates an expected call of WriteControl. -func (mr *MockWebsocketConnMockRecorder) WriteControl(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteControl", reflect.TypeOf((*MockWebsocketConn)(nil).WriteControl), arg0, arg1, arg2) -} - -// WriteMessage mocks base method. -func (m *MockWebsocketConn) WriteMessage(arg0 int, arg1 []byte) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WriteMessage", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// WriteMessage indicates an expected call of WriteMessage. -func (mr *MockWebsocketConnMockRecorder) WriteMessage(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteMessage", reflect.TypeOf((*MockWebsocketConn)(nil).WriteMessage), arg0, arg1) -} diff --git a/ecs-agent/eventstream/eventstream.go b/ecs-agent/eventstream/eventstream.go new file mode 100644 index 00000000000..90c13105912 --- /dev/null +++ b/ecs-agent/eventstream/eventstream.go @@ -0,0 +1,130 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package eventstream + +import ( + "context" + "fmt" + "sync" + + "github.com/cihub/seelog" +) + +type eventHandler func(...interface{}) error + +// EventStream waiting for events and notifying the listeners by invoking +// the handler that listeners registered +type EventStream struct { + name string + open bool + event chan interface{} + handlers map[string]eventHandler + ctx context.Context + handlersLock sync.RWMutex + statusLock sync.RWMutex +} + +func NewEventStream(name string, ctx context.Context) *EventStream { + return &EventStream{ + name: name, + event: make(chan interface{}, 1), + ctx: ctx, + open: false, + handlers: make(map[string]eventHandler), + } +} + +// Subscribe adds the handler to be called into EventStream +func (eventStream *EventStream) Subscribe(name string, handler eventHandler) error { + eventStream.handlersLock.Lock() + defer eventStream.handlersLock.Unlock() + + if _, ok := eventStream.handlers[name]; ok { + return fmt.Errorf("handler %s already exists", name) + } + + eventStream.handlers[name] = handler + return nil +} + +// broadcast calls all handler's handler function +func (eventStream *EventStream) broadcast(event interface{}) { + eventStream.handlersLock.RLock() + defer eventStream.handlersLock.RUnlock() + + seelog.Debugf("Event stream %s received events, broadcasting to listeners...", eventStream.name) + + for _, handlerFunc := range eventStream.handlers { + go handlerFunc(event) + } +} + +// Unsubscribe deletes the handler from the EventStream +func (eventStream *EventStream) Unsubscribe(name string) { + eventStream.handlersLock.Lock() + defer eventStream.handlersLock.Unlock() + + for handler := range eventStream.handlers { + if handler == name { + seelog.Debugf("Unsubscribing event handler %s from event stream %s", handler, eventStream.name) + delete(eventStream.handlers, handler) + return + } + } +} + +// WriteToEventStream writes event to the event stream +func (eventStream *EventStream) WriteToEventStream(event interface{}) error { + eventStream.statusLock.RLock() + defer eventStream.statusLock.RUnlock() + + if !eventStream.open { + return fmt.Errorf("Event stream is closed") + } + eventStream.event <- event + return nil +} + +// Context returns the context of event stream +func (eventStream *EventStream) Context() context.Context { + return eventStream.ctx +} + +// listen listens to the event channel +func (eventStream *EventStream) listen() { + seelog.Infof("Event stream %s start listening...", eventStream.name) + for { + select { + case event := <-eventStream.event: + eventStream.broadcast(event) + case <-eventStream.ctx.Done(): + seelog.Infof("Event stream %s stopped listening...", eventStream.name) + + eventStream.statusLock.Lock() + eventStream.open = false + close(eventStream.event) + eventStream.statusLock.Unlock() + return + } + } +} + +// StartListening mark the event stream as open and start listening +func (eventStream *EventStream) StartListening() { + eventStream.statusLock.Lock() + defer eventStream.statusLock.Unlock() + eventStream.open = true + + go eventStream.listen() +} diff --git a/agent/eventstream/eventstream_test.go b/ecs-agent/eventstream/eventstream_test.go similarity index 100% rename from agent/eventstream/eventstream_test.go rename to ecs-agent/eventstream/eventstream_test.go