From 129ea7fb294ab50666aedc52f9a48880ebb14956 Mon Sep 17 00:00:00 2001 From: Leonard Goodell Date: Tue, 24 Jan 2023 17:22:15 -0700 Subject: [PATCH] refactor!: Rework Core Commands via messaging to use new MessageBus Requst API BREAKING CHANGE: Topics configuration for Core Command has changed. Also the internal response topic is always edgex/response//. The prefix for this is now part of the standard MessageBus configuration. closes #4309 Signed-off-by: Leonard Goodell --- cmd/core-command/res/configuration.toml | 14 +- go.mod | 12 +- go.sum | 34 +-- .../command/controller/messaging/external.go | 39 ++- .../controller/messaging/external_test.go | 38 +-- .../command/controller/messaging/internal.go | 289 +++++++++--------- .../controller/messaging/internal_test.go | 252 +++++++++++++++ .../messaging/mocks/MessagingRouter.go | 58 ---- .../command/controller/messaging/router.go | 65 ---- .../command/controller/messaging/utils.go | 13 +- internal/core/command/main.go | 20 +- 11 files changed, 477 insertions(+), 357 deletions(-) create mode 100644 internal/core/command/controller/messaging/internal_test.go delete mode 100644 internal/core/command/controller/messaging/mocks/MessagingRouter.go delete mode 100644 internal/core/command/controller/messaging/router.go diff --git a/cmd/core-command/res/configuration.toml b/cmd/core-command/res/configuration.toml index f244669714..ca2d887ea0 100644 --- a/cmd/core-command/res/configuration.toml +++ b/cmd/core-command/res/configuration.toml @@ -63,12 +63,10 @@ Port = 6379 AuthMode = "usernamepassword" # required for redis messagebus (secure or insecure). SecretName = "redisdb" [MessageBus.Topics] - DeviceRequestTopicPrefix = "edgex/device/command/request" # for publishing requests to the device service; /// will be added to this publish topic prefix - DeviceResponseTopic = "edgex/device/command/response/#" # for subscribing to device service responses - CommandRequestTopic = "edgex/core/command/request/#" # for subscribing to internal command requests - CommandResponseTopicPrefix = "edgex/core/command/response" # for publishing responses back to internal service /// will be added to this publish topic prefix - QueryRequestTopic = "edgex/core/commandquery/request/#" # for subscribing to internal command query requests - QueryResponseTopic = "edgex/core/commandquery/response" # for publishing reponsses back to internal service + DeviceCommandRequestTopicPrefix = "edgex/device/command/request" # for publishing requests to the device service; /// will be added to this publish topic prefix + CommandRequestTopic = "edgex/core/command/request/#" # for subscribing to internal command requests + CommandQueryRequestTopic = "edgex/core/commandquery/request/#" # for subscribing to internal command query requests + ResponseTopicPrefix="edgex/response" # for subscribing/publishing internal responses (used by MessageBus Request API) [MessageBus.Optional] # Default MQTT Specific options that need to be here to enable evnironment variable overrides of them ClientId ="core-command" @@ -103,5 +101,5 @@ AuthMode = "none" [ExternalMQTT.Topics] CommandRequestTopic = "edgex/command/request/#" # for subscribing to 3rd party command requests CommandResponseTopicPrefix = "edgex/command/response" # for publishing responses back to 3rd party systems /// will be added to this publish topic prefix - QueryRequestTopic = "edgex/commandquery/request/#" # for subscribing to 3rd party command query request - QueryResponseTopic = "edgex/commandquery/response" # for publishing responses back to 3rd party systems + CommandQueryRequestTopic = "edgex/commandquery/request/#" # for subscribing to 3rd party command query request + CommandQueryResponseTopic = "edgex/commandquery/response" # for publishing responses back to 3rd party systems diff --git a/go.mod b/go.mod index 88bd22d569..d70a8b8ea5 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,11 @@ module github.com/edgexfoundry/edgex-go +replace ( + github.com/edgexfoundry/go-mod-core-contracts/v3 => ../MODS/go-mod-core-contracts + github.com/edgexfoundry/go-mod-messaging/v3 => ../MODS/go-mod-messaging +) + require ( - bitbucket.org/bertimus9/systemstat v0.0.0-20180207000608-0eeff89b0690 github.com/eclipse/paho.mqtt.golang v1.4.2 github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.16 github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2 @@ -32,9 +36,9 @@ require ( github.com/fatih/color v1.9.0 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect - github.com/go-playground/locales v0.14.0 // indirect - github.com/go-playground/universal-translator v0.18.0 // indirect - github.com/go-playground/validator/v10 v10.11.1 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.11.2 // indirect github.com/go-redis/redis/v7 v7.3.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/gorilla/websocket v1.4.2 // indirect diff --git a/go.sum b/go.sum index b4c149014b..e41b0b4939 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -bitbucket.org/bertimus9/systemstat v0.0.0-20180207000608-0eeff89b0690 h1:N9r8OBSXAgEUfho3SQtZLY8zo6E1OdOMvelvP22aVFc= -bitbucket.org/bertimus9/systemstat v0.0.0-20180207000608-0eeff89b0690/go.mod h1:Ulb78X89vxKYgdL24HMTiXYHlyHEvruOj1ZPlqeNEZM= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg= github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE= @@ -32,10 +30,6 @@ github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.16 h1:MN6dOZHbYkW8JRDlEin github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.16/go.mod h1:G2C3aUWZ96nZU03XRvCBntU13eUjXGIB9KqRKrhI8h8= github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2 h1:xp5MsP+qf/fuJxy8fT7k1N+c4j4C6w04qMCBXm6id7o= github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2/go.mod h1:1Vv4uWAo6r7k6jUlqVJW8JOL6YKVBc6sRL8Al3DrMck= -github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.6 h1:RQFs/HjVOi1X3YxJ8sm4vuX8nhKgH0caSf9RtjQvdeI= -github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.6/go.mod h1:7RwSq896VqelvSU7zYKs2tpZhgELVFECkiGf6XGLKfQ= -github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.4 h1:swPZOjoQ/IUIWSJpZCmQENtP/plFRx5tgiCEZgnfxFU= -github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.4/go.mod h1:8pxuYvh2zcq1GuKqmk1MAuH1yuN40iOMmL0g2myIfwk= github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3 h1:QgZF9f70Cwpvkjw3tP1aiVGHc+yNFJNzW6hO8pDs3fg= github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3/go.mod h1:2w8v0sv+i21nY+DY6JV4PFxsNTuxpGAjlNFlFMTfZkk= github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.5 h1:tEo8BVH4OZuJ/q9ii1H4PdtxlXLh/kOKpRuWFTHOcBc= @@ -54,14 +48,13 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= -github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= -github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= -github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= -github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= -github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= -github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= -github.com/go-playground/validator/v10 v10.11.1 h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ= -github.com/go-playground/validator/v10 v10.11.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.11.2 h1:q3SHpufmypg+erIExEKUmsgmhDTyhcJ38oeKGACXohU= +github.com/go-playground/validator/v10 v10.11.2/go.mod h1:NieE624vt4SCTJtD87arVLvdmjPAeV8BQlHtMnw9D7s= github.com/go-redis/redis/v7 v7.3.0 h1:3oHqd0W7f/VLKBxeYTEpqdMUsmMectngjM9OtoRoIgg= github.com/go-redis/redis/v7 v7.3.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -145,7 +138,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -209,7 +201,6 @@ github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0Mw github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -231,9 +222,7 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= -github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= @@ -265,7 +254,6 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA= @@ -279,7 +267,6 @@ golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= @@ -311,9 +298,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -324,7 +309,6 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= @@ -345,13 +329,10 @@ google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175 google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/eapache/queue.v1 v1.1.0 h1:EldqoJEGtXYiVCMRo2C9mePO2UUGnYn2+qLmlQSqPdc= gopkg.in/eapache/queue.v1 v1.1.0/go.mod h1:wNtmx1/O7kZSR9zNT1TTOJ7GLpm3Vn7srzlfylFbQwU= -gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI= @@ -365,6 +346,5 @@ gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/core/command/controller/messaging/external.go b/internal/core/command/controller/messaging/external.go index fc47a72783..cc5342a678 100644 --- a/internal/core/command/controller/messaging/external.go +++ b/internal/core/command/controller/messaging/external.go @@ -1,5 +1,6 @@ // // Copyright (C) 2022 IOTech Ltd +// Copyright (C) 2023 Intel Inc. // // SPDX-License-Identifier: Apache-2.0 @@ -9,6 +10,7 @@ import ( "encoding/json" "fmt" "strings" + "time" mqtt "github.com/eclipse/paho.mqtt.golang" bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container" @@ -20,31 +22,22 @@ import ( "github.com/edgexfoundry/edgex-go/internal/core/command/container" ) -const ( - QueryRequestTopic = "QueryRequestTopic" - QueryResponseTopic = "QueryResponseTopic" - CommandRequestTopic = "CommandRequestTopic" - CommandResponseTopicPrefix = "CommandResponseTopicPrefix" - DeviceRequestTopicPrefix = "DeviceRequestTopicPrefix" - DeviceResponseTopic = "DeviceResponseTopic" -) - -func OnConnectHandler(router MessagingRouter, dic *di.Container) mqtt.OnConnectHandler { +func OnConnectHandler(requestTimeout time.Duration, dic *di.Container) mqtt.OnConnectHandler { return func(client mqtt.Client) { lc := bootstrapContainer.LoggingClientFrom(dic.Get) config := container.ConfigurationFrom(dic.Get) externalTopics := config.ExternalMQTT.Topics qos := config.ExternalMQTT.QoS - requestQueryTopic := externalTopics[QueryRequestTopic] + requestQueryTopic := externalTopics[common.CommandQueryRequestTopicKey] if token := client.Subscribe(requestQueryTopic, qos, commandQueryHandler(dic)); token.Wait() && token.Error() != nil { lc.Errorf("could not subscribe to topic '%s': %s", requestQueryTopic, token.Error().Error()) } else { lc.Debugf("Subscribed to topic '%s' on external MQTT broker", requestQueryTopic) } - requestCommandTopic := externalTopics[CommandRequestTopic] - if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(router, dic)); token.Wait() && token.Error() != nil { + requestCommandTopic := externalTopics[common.CommandRequestTopicKey] + if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(requestTimeout, dic)); token.Wait() && token.Error() != nil { lc.Errorf("could not subscribe to topic '%s': %s", requestCommandTopic, token.Error().Error()) } else { lc.Debugf("Subscribed to topic '%s' on external MQTT broker", requestCommandTopic) @@ -65,7 +58,7 @@ func commandQueryHandler(dic *di.Container) mqtt.MessageHandler { } externalMQTTInfo := container.ConfigurationFrom(dic.Get).ExternalMQTT - responseTopic := externalMQTTInfo.Topics[QueryResponseTopic] + responseTopic := externalMQTTInfo.Topics[common.ExternalCommandQueryResponseTopicKey] if responseTopic == "" { lc.Error("QueryResponseTopic not provided in External.Topics") lc.Warn("Not publishing error message back due to insufficient information on response topic") @@ -91,7 +84,7 @@ func commandQueryHandler(dic *di.Container) mqtt.MessageHandler { } } -func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.MessageHandler { +func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt.MessageHandler { return func(client mqtt.Client, message mqtt.Message) { lc := bootstrapContainer.LoggingClientFrom(dic.Get) lc.Debugf("Received command request from external message broker on topic '%s' with %d bytes", message.Topic(), len(message.Payload())) @@ -124,10 +117,11 @@ func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.Messa lc.Warn("Not publishing error message back due to insufficient information on response topic") return } - externalResponseTopic := strings.Join([]string{externalMQTTInfo.Topics[CommandResponseTopicPrefix], deviceName, commandName, method}, "/") + + externalResponseTopic := strings.Join([]string{externalMQTTInfo.Topics[common.ExternalCommandResponseTopicPrefixKey], deviceName, commandName, method}, "/") internalMessageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus - deviceRequestTopic, err := validateRequestTopic(internalMessageBusInfo.Topics[DeviceRequestTopicPrefix], deviceName, commandName, method, dic) + deviceServiceName, deviceRequestTopic, err := validateRequestTopic(internalMessageBusInfo.Topics[common.DeviceCommandRequestTopicPrefixKey], deviceName, commandName, method, dic) if err != nil { responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc) @@ -142,7 +136,11 @@ func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.Messa } internalMessageBus := bootstrapContainer.MessagingClientFrom(dic.Get) - err = internalMessageBus.Publish(requestEnvelope, deviceRequestTopic) + + lc.Debugf("Sending Command request to internal MessageBus. Topic: %s, Request-id: %s Correlation-id: %s", deviceRequestTopic, requestEnvelope.RequestID, requestEnvelope.CorrelationID) + + // Request waits for the response and returns it. + response, err := internalMessageBus.Request(requestEnvelope, deviceServiceName, deviceRequestTopic, requestTimeout) if err != nil { errorMessage := fmt.Sprintf("Failed to send DeviceCommand request with internal MessageBus: %v", err) responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage) @@ -150,8 +148,9 @@ func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.Messa return } - lc.Debugf("Command request sent to internal MessageBus. Topic: %s, Correlation-id: %s", deviceRequestTopic, requestEnvelope.CorrelationID) - router.SetResponseTopic(requestEnvelope.RequestID, externalResponseTopic, true) + lc.Debugf("Command response received from internal MessageBus. Topic: %s, Request-id: %s Correlation-id: %s", response.RequestID, response.CorrelationID) + + publishMessage(client, externalResponseTopic, qos, retain, *response, lc) } } diff --git a/internal/core/command/controller/messaging/external_test.go b/internal/core/command/controller/messaging/external_test.go index 0541818a53..500671f7eb 100644 --- a/internal/core/command/controller/messaging/external_test.go +++ b/internal/core/command/controller/messaging/external_test.go @@ -1,5 +1,6 @@ // // Copyright (C) 2022-2023 IOTech Ltd +// Copyright (C) 2023 Intel Inc. // // SPDX-License-Identifier: Apache-2.0 @@ -12,6 +13,7 @@ import ( "net/http" "strings" "testing" + "time" clientMocks "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/interfaces/mocks" lcMocks "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger/mocks" @@ -53,12 +55,10 @@ const ( testExternalCommandRequestTopic = "unittest/external/request/#" testExternalCommandRequestTopicExample = "unittest/external/request/testDevice/testCommand/get" testExternalCommandResponseTopicPrefix = "unittest/external/response" - - testInternalCommandRequestTopicPrefix = "unittest/internal/request" + testInternalCommandRequestTopicPrefix = "unittest/internal/request" ) func TestOnConnectHandler(t *testing.T) { - mockRouter := &mocks.MessagingRouter{} lc := &lcMocks.LoggingClient{} lc.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return(nil) lc.On("Debugf", mock.Anything, mock.Anything).Return(nil) @@ -67,10 +67,10 @@ func TestOnConnectHandler(t *testing.T) { return &config.ConfigurationStruct{ ExternalMQTT: bootstrapConfig.ExternalMQTTInfo{ Topics: map[string]string{ - QueryRequestTopic: testQueryRequestTopic, - QueryResponseTopic: testQueryResponseTopic, - CommandRequestTopic: testExternalCommandRequestTopic, - CommandResponseTopicPrefix: testExternalCommandResponseTopicPrefix, + common.CommandRequestTopicKey: testExternalCommandRequestTopic, + common.ExternalCommandResponseTopicPrefixKey: testExternalCommandResponseTopicPrefix, + common.CommandQueryRequestTopicKey: testQueryRequestTopic, + common.ExternalCommandQueryResponseTopicKey: testQueryResponseTopic, }, QoS: 0, Retain: true, @@ -103,7 +103,7 @@ func TestOnConnectHandler(t *testing.T) { client.On("Subscribe", testQueryRequestTopic, byte(0), mock.Anything).Return(token) client.On("Subscribe", testExternalCommandRequestTopic, byte(0), mock.Anything).Return(token) - fn := OnConnectHandler(mockRouter, dic) + fn := OnConnectHandler(time.Second*10, dic) fn(client) if tt.expectedSucceed { @@ -124,7 +124,7 @@ func Test_commandQueryHandler(t *testing.T) { Name: testProfileName, }, DeviceResources: []dtos.DeviceResource{ - dtos.DeviceResource{ + { Name: testResourceName, Properties: dtos.ResourceProperties{ ValueType: common.ValueTypeString, @@ -144,7 +144,7 @@ func Test_commandQueryHandler(t *testing.T) { allDevicesResponse := responses.MultiDevicesResponse{ BaseWithTotalCountResponse: commonDTO.NewBaseWithTotalCountResponse("", "", http.StatusOK, 1), Devices: []dtos.Device{ - dtos.Device{ + { Name: testDeviceName, ProfileName: testProfileName, }, @@ -173,7 +173,7 @@ func Test_commandQueryHandler(t *testing.T) { QoS: 0, Retain: true, Topics: map[string]string{ - QueryResponseTopic: testQueryResponseTopic, + common.ExternalCommandQueryResponseTopicKey: testQueryResponseTopic, }, }, } @@ -269,12 +269,12 @@ func Test_commandRequestHandler(t *testing.T) { }, } - mockRouter := &mocks.MessagingRouter{} - mockRouter.On("SetResponseTopic", mock.Anything, mock.Anything, mock.Anything).Return(nil) + expectedResponse := &types.MessageEnvelope{} + lc := &lcMocks.LoggingClient{} lc.On("Error", mock.Anything).Return(nil) lc.On("Errorf", mock.Anything, mock.Anything).Return(nil) - lc.On("Debugf", mock.Anything, mock.Anything, mock.Anything).Return(nil) + lc.On("Debugf", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) lc.On("Warn", mock.Anything).Return(nil) dc := &clientMocks.DeviceClient{} dc.On("DeviceByName", context.Background(), testDeviceName).Return(deviceResponse, nil) @@ -284,7 +284,7 @@ func Test_commandRequestHandler(t *testing.T) { dsc.On("DeviceServiceByName", context.Background(), testDeviceServiceName).Return(deviceServiceResponse, nil) dsc.On("DeviceServiceByName", context.Background(), unknownService).Return(responses.DeviceServiceResponse{}, edgexErr.NewCommonEdgeX(edgexErr.KindEntityDoesNotExist, "unknown device service", nil)) client := &internalMessagingMocks.MessageClient{} - client.On("Publish", mock.Anything, mock.Anything).Return(nil) + client.On("Request", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedResponse, nil) dic := di.NewContainer(di.ServiceConstructorMap{ container.ConfigurationName: func(get di.Get) interface{} { return &config.ConfigurationStruct{ @@ -295,14 +295,14 @@ func Test_commandRequestHandler(t *testing.T) { }, MessageBus: bootstrapConfig.MessageBusInfo{ Topics: map[string]string{ - DeviceRequestTopicPrefix: testInternalCommandRequestTopicPrefix, + common.DeviceCommandRequestTopicPrefixKey: testInternalCommandRequestTopicPrefix, }, }, ExternalMQTT: bootstrapConfig.ExternalMQTTInfo{ QoS: 0, Retain: true, Topics: map[string]string{ - CommandResponseTopicPrefix: testExternalCommandResponseTopicPrefix, + common.ExternalCommandResponseTopicPrefixKey: testExternalCommandResponseTopicPrefix, }, }, } @@ -359,7 +359,7 @@ func Test_commandRequestHandler(t *testing.T) { mqttClient := &mocks.Client{} mqttClient.On("Publish", mock.Anything, byte(0), true, mock.Anything).Return(token) - fn := commandRequestHandler(mockRouter, dic) + fn := commandRequestHandler(time.Second*10, dic) fn(mqttClient, message) if tt.expectedError { if tt.expectedPublishError { @@ -372,7 +372,7 @@ func Test_commandRequestHandler(t *testing.T) { } expectedInternalRequestTopic := strings.Join([]string{testInternalCommandRequestTopicPrefix, testDeviceServiceName, testDeviceName, testCommandName, testMethod}, "/") - client.AssertCalled(t, "Publish", tt.payload, expectedInternalRequestTopic) + client.AssertCalled(t, "Request", tt.payload, testDeviceServiceName, expectedInternalRequestTopic, mock.Anything) }) } } diff --git a/internal/core/command/controller/messaging/internal.go b/internal/core/command/controller/messaging/internal.go index 2f8a590a5a..093345ccb9 100644 --- a/internal/core/command/controller/messaging/internal.go +++ b/internal/core/command/controller/messaging/internal.go @@ -1,5 +1,6 @@ // // Copyright (C) 2022-2023 IOTech Ltd +// Copyright (C) 2023 Intel Inc. // // SPDX-License-Identifier: Apache-2.0 @@ -7,10 +8,14 @@ package messaging import ( "context" + "fmt" "strings" + "time" + "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger" "github.com/edgexfoundry/go-mod-core-contracts/v3/common" "github.com/edgexfoundry/go-mod-core-contracts/v3/errors" + "github.com/edgexfoundry/go-mod-messaging/v3/messaging" bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v3/di" @@ -20,17 +25,18 @@ import ( "github.com/edgexfoundry/edgex-go/internal/core/command/container" ) -// SubscribeCommandResponses subscribes command responses from device services via internal MessageBus -func SubscribeCommandResponses(ctx context.Context, router MessagingRouter, dic *di.Container) errors.EdgeX { +// SubscribeCommandRequests subscribes command requests from EdgeX service (e.g., Application Service) +// and forwards them to the appropriate Device Service via internal MessageBus +func SubscribeCommandRequests(ctx context.Context, requestTimeout time.Duration, dic *di.Container) errors.EdgeX { lc := bootstrapContainer.LoggingClientFrom(dic.Get) - internalMessageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus - internalResponseTopic := internalMessageBusInfo.Topics[DeviceResponseTopic] + messageBusTopics := container.ConfigurationFrom(dic.Get).MessageBus.Topics + requestCommandTopic := messageBusTopics[common.CommandRequestTopicKey] messages := make(chan types.MessageEnvelope) messageErrors := make(chan error) topics := []types.TopicChannel{ { - Topic: internalResponseTopic, + Topic: requestCommandTopic, Messages: messages, }, } @@ -41,40 +47,16 @@ func SubscribeCommandResponses(ctx context.Context, router MessagingRouter, dic return errors.NewCommonEdgeXWrapper(err) } - externalMQTTInfo := container.ConfigurationFrom(dic.Get).ExternalMQTT - qos := externalMQTTInfo.QoS - retain := externalMQTTInfo.Retain - externalMQTT := bootstrapContainer.ExternalMQTTMessagingClientFrom(dic.Get) go func() { for { select { case <-ctx.Done(): - lc.Infof("Exiting waiting for MessageBus '%s' topic messages", internalResponseTopic) + lc.Infof("Exiting waiting for MessageBus '%s' topic messages", requestCommandTopic) return case err = <-messageErrors: lc.Error(err.Error()) - case msgEnvelope := <-messages: - lc.Debugf("Command response received on internal MessageBus. Topic: %s, Correlation-id: %s", msgEnvelope.ReceivedTopic, msgEnvelope.CorrelationID) - - responseTopic, external, err := router.ResponseTopic(msgEnvelope.RequestID) - if err != nil { - lc.Errorf("Received RequestEnvelope with unknown RequestId %s", msgEnvelope.RequestID) - continue - } - - // original request is from external MQTT - if external { - publishMessage(externalMQTT, responseTopic, qos, retain, msgEnvelope, lc) - continue - } - - // original request is from internal MessageBus - err = messageBus.Publish(msgEnvelope, responseTopic) - if err != nil { - lc.Errorf("Could not publish to internal MessageBus topic '%s': %s", responseTopic, err.Error()) - continue - } - lc.Debugf("Command response sent to internal MessageBus. Topic: %s, Correlation-id: %s", responseTopic, msgEnvelope.CorrelationID) + case requestEnvelope := <-messages: + processDeviceCommandRequest(messageBus, requestEnvelope, messageBusTopics, requestTimeout, lc, dic) } } }() @@ -82,116 +64,118 @@ func SubscribeCommandResponses(ctx context.Context, router MessagingRouter, dic return nil } -// SubscribeCommandRequests subscribes command requests from EdgeX service (e.g., Application Service) -// via internal MessageBus -func SubscribeCommandRequests(ctx context.Context, router MessagingRouter, dic *di.Container) errors.EdgeX { - lc := bootstrapContainer.LoggingClientFrom(dic.Get) - internalMessageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus - internalRequestCommandTopic := internalMessageBusInfo.Topics[CommandRequestTopic] +func processDeviceCommandRequest( + messageBus messaging.MessageClient, + requestEnvelope types.MessageEnvelope, + messageBusTopics map[string]string, + requestTimeout time.Duration, + lc logger.LoggingClient, + dic *di.Container) { + var err error + + lc.Debugf("Command device request received on internal MessageBus. Topic: %s, Request-id: %s, Correlation-id: %s", requestEnvelope.ReceivedTopic, requestEnvelope.RequestID, requestEnvelope.CorrelationID) + + if len(strings.TrimSpace(requestEnvelope.RequestID)) == 0 { + lc.Errorf("RequestId not set in Command request received on internal MessageBus") + lc.Warn("Not publishing error message back due to insufficient information to publish on response topic") + return + } - messages := make(chan types.MessageEnvelope) - messageErrors := make(chan error) - topics := []types.TopicChannel{ - { - Topic: internalRequestCommandTopic, - Messages: messages, - }, + // internal response topic scheme: // + internalResponseTopic := strings.Join([]string{messageBusTopics[common.ResponseTopicPrefixKey], common.CoreCommandServiceKey, requestEnvelope.RequestID}, "/") + + topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/") + length := len(topicLevels) + if length < 3 { + err = fmt.Errorf("invalid internal command request topic scheme. Expected request topic scheme with >=3 levels: '///'") + lc.Error(err.Error()) + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) + err = messageBus.Publish(responseEnvelope, internalResponseTopic) + if err != nil { + lc.Errorf("Could not publish to topic '%s': %s", internalResponseTopic, err.Error()) + } + return } - messageBus := bootstrapContainer.MessagingClientFrom(dic.Get) - err := messageBus.Subscribe(topics, messageErrors) + // expected internal command request/response topic scheme: #/// + deviceName := topicLevels[length-3] + commandName := topicLevels[length-2] + method := topicLevels[length-1] + if !strings.EqualFold(method, "get") && !strings.EqualFold(method, "set") { + err = fmt.Errorf("unknown request method: %s, only 'get' or 'set' is allowed", method) + lc.Error(err.Error()) + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) + err = messageBus.Publish(responseEnvelope, internalResponseTopic) + if err != nil { + lc.Errorf("Could not publish to topic '%s': %s", internalResponseTopic, err.Error()) + } + return + } + + // internal command request topic scheme: //// + deviceServiceName, deviceRequestTopic, err := validateRequestTopic(messageBusTopics[common.DeviceCommandRequestTopicPrefixKey], deviceName, commandName, method, dic) if err != nil { - return errors.NewCommonEdgeXWrapper(err) + err = fmt.Errorf("invalid request topic: %s", err.Error()) + lc.Error(err.Error()) + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) + err = messageBus.Publish(responseEnvelope, internalResponseTopic) + if err != nil { + lc.Errorf("Could not publish to topic '%s': %s", internalResponseTopic, err.Error()) + } + return } - go func() { - for { - select { - case <-ctx.Done(): - lc.Infof("Exiting waiting for MessageBus '%s' topic messages", internalRequestCommandTopic) - return - case err = <-messageErrors: - lc.Error(err.Error()) - case requestEnvelope := <-messages: - lc.Debugf("Command request received on internal MessageBus. Topic: %s, Correlation-id: %s", requestEnvelope.ReceivedTopic, requestEnvelope.CorrelationID) - - topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/") - length := len(topicLevels) - if length < 3 { - lc.Error("Failed to parse and construct internal command response topic scheme, expected request topic scheme: '#///'") - lc.Warn("Not publishing error message back due to insufficient information on response topic") - continue - } - - // expected internal command request/response topic scheme: #/// - deviceName := topicLevels[length-3] - commandName := topicLevels[length-2] - method := topicLevels[length-1] - if !strings.EqualFold(method, "get") && !strings.EqualFold(method, "set") { - lc.Errorf("Unknown request method: %s, only 'get' or 'set' is allowed", method) - lc.Warn("Not publishing error message back due to insufficient information on response topic") - continue - } - internalResponseTopic := strings.Join([]string{internalMessageBusInfo.Topics[CommandResponseTopicPrefix], deviceName, commandName, method}, "/") - - deviceRequestTopic, err := validateRequestTopic(internalMessageBusInfo.Topics[DeviceRequestTopicPrefix], deviceName, commandName, method, dic) - if err != nil { - lc.Errorf("invalid request topic: %s", err.Error()) - responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) - err = messageBus.Publish(responseEnvelope, internalResponseTopic) - if err != nil { - lc.Errorf("Could not publish to topic '%s': %s", internalResponseTopic, err.Error()) - } - - continue - } - - err = validateGetCommandQueryParameters(requestEnvelope.QueryParams) - if err != nil { - lc.Errorf(err.Error()) - responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) - err = messageBus.Publish(responseEnvelope, internalResponseTopic) - if err != nil { - lc.Errorf("Could not publish to topic '%s': %s", internalResponseTopic, err.Error()) - } - - continue - } - - // expected internal command request topic scheme: #//// - err = messageBus.Publish(requestEnvelope, deviceRequestTopic) - if err != nil { - lc.Errorf("Could not publish to topic '%s': %s", deviceRequestTopic, err.Error()) - continue - } - - lc.Debugf("Command request sent to internal MessageBus. Topic: %s, Correlation-id: %s", deviceRequestTopic, requestEnvelope.CorrelationID) - router.SetResponseTopic(requestEnvelope.RequestID, internalResponseTopic, false) - } + err = validateGetCommandQueryParameters(requestEnvelope.QueryParams) + if err != nil { + lc.Errorf(err.Error()) + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) + err = messageBus.Publish(responseEnvelope, internalResponseTopic) + if err != nil { + lc.Errorf("Could not publish to topic '%s': %s", internalResponseTopic, err.Error()) } - }() + return + } - return nil + lc.Debugf("Sending Command Device Request to internal MessageBus. Topic: %s, Correlation-id: %s", deviceRequestTopic, requestEnvelope.CorrelationID) + + deviceResponseTopicPrefix := strings.Join([]string{messageBusTopics[common.ResponseTopicPrefixKey], deviceServiceName}, "/") + + response, err := messageBus.Request(requestEnvelope, deviceRequestTopic, deviceResponseTopicPrefix, requestTimeout) + if err != nil { + lc.Errorf("Request to topic '%s' failed: %s", deviceRequestTopic, err.Error()) + return + } + + // original request is from internal MessageBus + err = messageBus.Publish(*response, internalResponseTopic) + if err != nil { + lc.Errorf("Could not publish to internal MessageBus topic '%s': %s", internalResponseTopic, err.Error()) + return + } + + lc.Debugf("Command response sent to internal MessageBus. Topic: %s, Correlation-id: %s", internalResponseTopic, response.CorrelationID) } // SubscribeCommandQueryRequests subscribes command query requests from EdgeX service (e.g., Application Service) // via internal MessageBus func SubscribeCommandQueryRequests(ctx context.Context, dic *di.Container) errors.EdgeX { lc := bootstrapContainer.LoggingClientFrom(dic.Get) - internalMessageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus - internalQueryRequestTopic := internalMessageBusInfo.Topics[QueryRequestTopic] - internalQueryResponseTopic := internalMessageBusInfo.Topics[QueryResponseTopic] + messageBusTopics := container.ConfigurationFrom(dic.Get).MessageBus.Topics + queryRequestTopic := messageBusTopics[common.CommandQueryRequestTopicKey] messages := make(chan types.MessageEnvelope) messageErrors := make(chan error) topics := []types.TopicChannel{ { - Topic: internalQueryRequestTopic, + Topic: queryRequestTopic, Messages: messages, }, } messageBus := bootstrapContainer.MessagingClientFrom(dic.Get) + + lc.Infof("Subscribing to internal command query requests on topic: %s", queryRequestTopic) + err := messageBus.Subscribe(topics, messageErrors) if err != nil { return errors.NewCommonEdgeXWrapper(err) @@ -201,37 +185,58 @@ func SubscribeCommandQueryRequests(ctx context.Context, dic *di.Container) error for { select { case <-ctx.Done(): - lc.Infof("Exiting waiting for MessageBus '%s' topic messages", internalQueryRequestTopic) + lc.Infof("Exiting waiting for MessageBus '%s' topic messages", queryRequestTopic) return case err = <-messageErrors: lc.Error(err.Error()) case requestEnvelope := <-messages: - lc.Debugf("Command query request received on internal MessageBus. Topic: %s, Correlation-id: %s", requestEnvelope.ReceivedTopic, requestEnvelope.CorrelationID) - - // example topic scheme: /commandquery/request/ - // deviceName is expected to be at last topic level. - topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/") - deviceName := topicLevels[len(topicLevels)-1] - if strings.EqualFold(deviceName, common.All) { - deviceName = common.All - } - - responseEnvelope, err := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic) - if err != nil { - lc.Error(err.Error()) - responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) - } - - err = messageBus.Publish(responseEnvelope, internalQueryResponseTopic) - if err != nil { - lc.Errorf("Could not publish to topic '%s': %s", internalQueryResponseTopic, err.Error()) - continue - } - - lc.Debugf("Command query response sent to internal MessageBus. Topic: %s, Correlation-id: %s", internalQueryResponseTopic, requestEnvelope.CorrelationID) + processCommandQueryRequest(messageBus, requestEnvelope, messageBusTopics, lc, dic) } } }() return nil } + +func processCommandQueryRequest( + messageBus messaging.MessageClient, + requestEnvelope types.MessageEnvelope, + messageBusTopics map[string]string, + lc logger.LoggingClient, + dic *di.Container, +) { + lc.Debugf("Command query request received on internal MessageBus. Topic: %s, Request-id: %s, Correlation-id: %s", requestEnvelope.ReceivedTopic, requestEnvelope.RequestID, requestEnvelope.CorrelationID) + + if len(strings.TrimSpace(requestEnvelope.RequestID)) == 0 { + lc.Errorf("RequestId not set in Command request received on internal MessageBus") + lc.Warn("Not publishing error message back due to insufficient information to publish on response topic") + return + } + + // example topic scheme: /commandquery/request/ + // deviceName is expected to be at last topic level. + topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/") + deviceName := topicLevels[len(topicLevels)-1] + if strings.EqualFold(deviceName, common.All) { + deviceName = common.All + } + + responseEnvelope, err := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic) + if err != nil { + lc.Error(err.Error()) + responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) + } + + // internal response topic scheme: // + internalQueryResponseTopic := strings.Join([]string{messageBusTopics[common.ResponseTopicPrefixKey], common.CoreCommandServiceKey, requestEnvelope.RequestID}, "/") + + lc.Debugf("Responding to command query request on topic: %s", internalQueryResponseTopic) + + err = messageBus.Publish(responseEnvelope, internalQueryResponseTopic) + if err != nil { + lc.Errorf("Could not publish to topic '%s': %s", internalQueryResponseTopic, err.Error()) + return + } + + lc.Debugf("Command query response sent to internal MessageBus. Topic: %s, Correlation-id: %s", internalQueryResponseTopic, requestEnvelope.CorrelationID) +} diff --git a/internal/core/command/controller/messaging/internal_test.go b/internal/core/command/controller/messaging/internal_test.go new file mode 100644 index 0000000000..fa2291394d --- /dev/null +++ b/internal/core/command/controller/messaging/internal_test.go @@ -0,0 +1,252 @@ +// Copyright (C) 2023 Intel Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +package messaging + +import ( + "context" + "strings" + "sync" + "testing" + "time" + + "github.com/edgexfoundry/edgex-go/internal/core/command/config" + "github.com/edgexfoundry/edgex-go/internal/core/command/container" + bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container" + config2 "github.com/edgexfoundry/go-mod-bootstrap/v3/config" + "github.com/edgexfoundry/go-mod-bootstrap/v3/di" + mocks2 "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/interfaces/mocks" + lcMocks "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger/mocks" + "github.com/edgexfoundry/go-mod-core-contracts/v3/common" + "github.com/edgexfoundry/go-mod-core-contracts/v3/dtos" + "github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/responses" + "github.com/edgexfoundry/go-mod-messaging/v3/messaging/mocks" + "github.com/edgexfoundry/go-mod-messaging/v3/pkg/types" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +var expectedResponseTopicPrefix = "edgex/response" +var expectedProfileName = "TestProfile" + +func TestSubscribeCommandRequests(t *testing.T) { + wg := sync.WaitGroup{} + expectedServiceName := "device-simple" + expectedRequestId := uuid.NewString() + expectedCorrelationId := uuid.NewString() + expectedDevice := "device1" + expectedResource := "resource" + expectedMethod := "get" + expectedDeviceResponseTopicPrefix := strings.Join([]string{expectedResponseTopicPrefix, expectedServiceName}, "/") + expectedCommandResponseTopic := strings.Join([]string{expectedResponseTopicPrefix, common.CoreCommandServiceKey, expectedRequestId}, "/") + expectedDeviceRequestTopic := strings.Join([]string{"edgex/device/command/request", expectedServiceName, expectedDevice, expectedResource, expectedMethod}, "/") + + mockLogger := &lcMocks.LoggingClient{} + mockDeviceClient := &mocks2.DeviceClient{} + mockDeviceProfileClient := &mocks2.DeviceProfileClient{} + mockDeviceServiceClient := &mocks2.DeviceServiceClient{} + mockMessaging := &mocks.MessageClient{} + + mockLogger.On("Debugf", mock.Anything, mock.Anything, mock.Anything, mock.Anything) + + mockMessaging.On("Subscribe", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + topics := args.Get(0).([]types.TopicChannel) + require.Len(t, topics, 1) + require.Equal(t, expectedDeviceRequestTopic, topics[0].Topic) + wg.Add(1) + go func() { + defer wg.Done() + topics[0].Messages <- types.MessageEnvelope{ + RequestID: expectedRequestId, + CorrelationID: expectedCorrelationId, + ReceivedTopic: expectedDeviceRequestTopic, + } + time.Sleep(time.Second * 1) + }() + }).Return(nil) + + mockMessaging.On("Request", mock.Anything, expectedDeviceRequestTopic, expectedDeviceResponseTopicPrefix, mock.Anything).Run(func(args mock.Arguments) { + }).Return(&types.MessageEnvelope{ + RequestID: expectedRequestId, + CorrelationID: expectedCorrelationId, + ContentType: types.ContentTypeJSON, + Payload: []byte("This is my payload"), + }, nil) + + mockMessaging.On("Publish", mock.Anything, expectedCommandResponseTopic).Run(func(args mock.Arguments) { + response := args.Get(0).(types.MessageEnvelope) + assert.Equal(t, expectedRequestId, response.RequestID) + assert.Equal(t, expectedCorrelationId, response.CorrelationID) + assert.Equal(t, types.ContentTypeJSON, response.ContentType) + assert.NotZero(t, len(response.Payload)) + }).Return(nil) + + mockDeviceClient.On("DeviceByName", mock.Anything, expectedDevice).Return( + responses.DeviceResponse{ + Device: dtos.Device{ + ProfileName: expectedProfileName, + ServiceName: expectedServiceName, + }, + }, + nil) + + mockDeviceServiceClient.On("DeviceServiceByName", mock.Anything, expectedServiceName).Return( + responses.DeviceServiceResponse{ + Service: dtos.DeviceService{ + Name: expectedServiceName, + }, + }, + nil) + + dic := di.NewContainer(di.ServiceConstructorMap{ + container.ConfigurationName: func(get di.Get) interface{} { + return &config.ConfigurationStruct{ + MessageBus: config2.MessageBusInfo{ + Topics: map[string]string{ + common.CommandRequestTopicKey: expectedDeviceRequestTopic, + common.ResponseTopicPrefixKey: expectedResponseTopicPrefix, + common.DeviceCommandRequestTopicPrefixKey: "edgex/device/command/request", + }, + }, + } + }, + bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} { + return mockLogger + }, + bootstrapContainer.MessagingClientName: func(get di.Get) interface{} { + return mockMessaging + }, + bootstrapContainer.DeviceClientName: func(get di.Get) interface{} { + return mockDeviceClient + }, + bootstrapContainer.DeviceProfileClientName: func(get di.Get) interface{} { + return mockDeviceProfileClient + }, + bootstrapContainer.DeviceServiceClientName: func(get di.Get) interface{} { + return mockDeviceServiceClient + }, + }) + + err := SubscribeCommandRequests(context.Background(), time.Second*5, dic) + require.NoError(t, err) + + wg.Wait() + + mockMessaging.AssertExpectations(t) +} + +func TestSubscribeCommandQueryRequests(t *testing.T) { + wg := sync.WaitGroup{} + expectedRequestId := uuid.NewString() + expectedCorrelationId := uuid.NewString() + expectedResponseTopic := strings.Join([]string{expectedResponseTopicPrefix, common.CoreCommandServiceKey, expectedRequestId}, "/") + + tests := []struct { + Name string + ExpectedDeviceName string + }{ + {"By Device", "Device1"}, + {"All Devices", "All"}, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + + mockLogger := &lcMocks.LoggingClient{} + mockDeviceClient := &mocks2.DeviceClient{} + mockDeviceProfileClient := &mocks2.DeviceProfileClient{} + mockMessaging := &mocks.MessageClient{} + + mockLogger.On("Debugf", mock.Anything, mock.Anything, mock.Anything, mock.Anything) + mockLogger.On("Errorf", mock.Anything).Run(func(args mock.Arguments) { + require.Fail(t, "Errorf not expected") + }) + mockLogger.On("Error", mock.Anything).Run(func(args mock.Arguments) { + require.Fail(t, "Error not expected") + }) + + expectedRequestTopic := "edgex/commandquery/request/" + test.ExpectedDeviceName + + mockDeviceClient.On("DeviceByName", mock.Anything, test.ExpectedDeviceName).Return( + responses.DeviceResponse{ + Device: dtos.Device{ + ProfileName: expectedProfileName, + }, + }, + nil) + + mockDeviceClient.On("AllDevices", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + responses.MultiDevicesResponse{ + Devices: []dtos.Device{ + { + ProfileName: expectedProfileName, + }, + }, + }, + nil) + + mockDeviceProfileClient.On("DeviceProfileByName", mock.Anything, expectedProfileName).Return( + responses.DeviceProfileResponse{}, + nil) + + mockMessaging.On("Subscribe", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + topics := args.Get(0).([]types.TopicChannel) + require.Len(t, topics, 1) + require.Equal(t, expectedRequestTopic, topics[0].Topic) + wg.Add(1) + go func() { + defer wg.Done() + topics[0].Messages <- types.MessageEnvelope{ + RequestID: expectedRequestId, + CorrelationID: expectedCorrelationId, + ReceivedTopic: expectedRequestTopic, + } + time.Sleep(time.Second * 1) + }() + }).Return(nil) + mockMessaging.On("Publish", mock.Anything, expectedResponseTopic).Run(func(args mock.Arguments) { + response := args.Get(0).(types.MessageEnvelope) + assert.Equal(t, expectedRequestId, response.RequestID) + assert.Equal(t, expectedCorrelationId, response.CorrelationID) + assert.Equal(t, types.ContentTypeJSON, response.ContentType) + assert.NotZero(t, len(response.Payload)) + }).Return(nil) + + dic := di.NewContainer(di.ServiceConstructorMap{ + container.ConfigurationName: func(get di.Get) interface{} { + return &config.ConfigurationStruct{ + MessageBus: config2.MessageBusInfo{ + Topics: map[string]string{ + common.CommandQueryRequestTopicKey: expectedRequestTopic, + common.ResponseTopicPrefixKey: expectedResponseTopicPrefix, + }, + }, + } + }, + bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} { + return mockLogger + }, + bootstrapContainer.MessagingClientName: func(get di.Get) interface{} { + return mockMessaging + }, + bootstrapContainer.DeviceClientName: func(get di.Get) interface{} { + return mockDeviceClient + }, + bootstrapContainer.DeviceProfileClientName: func(get di.Get) interface{} { + return mockDeviceProfileClient + }, + }) + + err := SubscribeCommandQueryRequests(context.Background(), dic) + require.NoError(t, err) + + wg.Wait() + + mockMessaging.AssertExpectations(t) + + }) + } +} diff --git a/internal/core/command/controller/messaging/mocks/MessagingRouter.go b/internal/core/command/controller/messaging/mocks/MessagingRouter.go deleted file mode 100644 index e55831937f..0000000000 --- a/internal/core/command/controller/messaging/mocks/MessagingRouter.go +++ /dev/null @@ -1,58 +0,0 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. - -package mocks - -import mock "github.com/stretchr/testify/mock" - -// MessagingRouter is an autogenerated mock type for the MessagingRouter type -type MessagingRouter struct { - mock.Mock -} - -// ResponseTopic provides a mock function with given fields: requestId -func (_m *MessagingRouter) ResponseTopic(requestId string) (string, bool, error) { - ret := _m.Called(requestId) - - var r0 string - if rf, ok := ret.Get(0).(func(string) string); ok { - r0 = rf(requestId) - } else { - r0 = ret.Get(0).(string) - } - - var r1 bool - if rf, ok := ret.Get(1).(func(string) bool); ok { - r1 = rf(requestId) - } else { - r1 = ret.Get(1).(bool) - } - - var r2 error - if rf, ok := ret.Get(2).(func(string) error); ok { - r2 = rf(requestId) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - -// SetResponseTopic provides a mock function with given fields: requestId, topic, external -func (_m *MessagingRouter) SetResponseTopic(requestId string, topic string, external bool) { - _m.Called(requestId, topic, external) -} - -type mockConstructorTestingTNewMessagingRouter interface { - mock.TestingT - Cleanup(func()) -} - -// NewMessagingRouter creates a new instance of MessagingRouter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMessagingRouter(t mockConstructorTestingTNewMessagingRouter) *MessagingRouter { - mock := &MessagingRouter{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/internal/core/command/controller/messaging/router.go b/internal/core/command/controller/messaging/router.go deleted file mode 100644 index 2f57c04888..0000000000 --- a/internal/core/command/controller/messaging/router.go +++ /dev/null @@ -1,65 +0,0 @@ -// -// Copyright (C) 2022 IOTech Ltd -// -// SPDX-License-Identifier: Apache-2.0 - -package messaging - -import ( - "errors" - "sync" -) - -// MessagingRouter defines interface for Command Service to know -// where to route the receiving device command response. -type MessagingRouter interface { - // ResponseTopic returns the responseTopicPrefix by requestId, and a boolean value - // indicates its original source(external MQTT or internal MessageBus). - ResponseTopic(requestId string) (string, bool, error) - // SetResponseTopic sets the responseTopicPrefix with RequestId as the key - SetResponseTopic(requestId string, topic string, external bool) -} - -func NewMessagingRouter() MessagingRouter { - return &router{ - internalCommandRequestMap: make(map[string]string), - externalCommandRequestMap: make(map[string]string), - } -} - -type router struct { - mutex sync.Mutex - internalCommandRequestMap map[string]string - externalCommandRequestMap map[string]string -} - -func (r *router) ResponseTopic(requestId string) (string, bool, error) { - r.mutex.Lock() - defer r.mutex.Unlock() - - topic, ok := r.externalCommandRequestMap[requestId] - if ok { - delete(r.externalCommandRequestMap, requestId) - return topic, true, nil - } - - topic, ok = r.internalCommandRequestMap[requestId] - if ok { - delete(r.internalCommandRequestMap, requestId) - return topic, false, nil - } - - return "", false, errors.New("requestId not found") -} - -func (r *router) SetResponseTopic(requestId string, topic string, external bool) { - r.mutex.Lock() - defer r.mutex.Unlock() - - if external { - r.externalCommandRequestMap[requestId] = topic - return - } - - r.internalCommandRequestMap[requestId] = topic -} diff --git a/internal/core/command/controller/messaging/utils.go b/internal/core/command/controller/messaging/utils.go index 03fa82da96..5372975e8b 100644 --- a/internal/core/command/controller/messaging/utils.go +++ b/internal/core/command/controller/messaging/utils.go @@ -1,5 +1,6 @@ // // Copyright (C) 2022 IOTech Ltd +// Copyright (C) 2023 Intel Inc. // // SPDX-License-Identifier: Apache-2.0 @@ -25,29 +26,29 @@ import ( // validateRequestTopic validates the request topic by checking the existence of device and device service, // returns the internal device request topic to which the command request will be sent. -func validateRequestTopic(prefix string, deviceName string, commandName string, method string, dic *di.Container) (string, error) { +func validateRequestTopic(prefix string, deviceName string, commandName string, method string, dic *di.Container) (string, string, error) { // retrieve device information through Metadata DeviceClient dc := bootstrapContainer.DeviceClientFrom(dic.Get) if dc == nil { - return "", errors.New("nil Device Client") + return "", "", errors.New("nil Device Client") } deviceResponse, err := dc.DeviceByName(context.Background(), deviceName) if err != nil { - return "", fmt.Errorf("failed to get Device by name %s: %v", deviceName, err) + return "", "", fmt.Errorf("failed to get Device by name %s: %v", deviceName, err) } // retrieve device service information through Metadata DeviceClient dsc := bootstrapContainer.DeviceServiceClientFrom(dic.Get) if dsc == nil { - return "", errors.New("nil DeviceService Client") + return "", "", errors.New("nil DeviceService Client") } deviceServiceResponse, err := dsc.DeviceServiceByName(context.Background(), deviceResponse.Device.ServiceName) if err != nil { - return "", fmt.Errorf("failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err) + return "", "", fmt.Errorf("failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err) } // expected internal command request topic scheme: #//// - return strings.Join([]string{prefix, deviceServiceResponse.Service.Name, deviceName, commandName, method}, "/"), nil + return deviceServiceResponse.Service.Name, strings.Join([]string{prefix, deviceServiceResponse.Service.Name, deviceName, commandName, method}, "/"), nil } diff --git a/internal/core/command/main.go b/internal/core/command/main.go index 89ea02dca1..2726d4df5d 100644 --- a/internal/core/command/main.go +++ b/internal/core/command/main.go @@ -1,6 +1,7 @@ /******************************************************************************* * Copyright 2020 Dell Inc. * Copyright 2022-2023 IOTech Ltd. + * Copyright 2023 Intel Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -19,6 +20,7 @@ import ( "context" "os" "sync" + "time" "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap" bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container" @@ -41,7 +43,7 @@ import ( func Main(ctx context.Context, cancel context.CancelFunc, router *mux.Router) { startupTimer := startup.NewStartUpTimer(common.CoreCommandServiceKey) - // All common command-line flags have been moved to DefaultCommonFlags. Service specific flags can be add here, + // All common command-line flags have been moved to DefaultCommonFlags. Service specific flags can be added here, // by inserting service specific flag prior to call to commonFlags.Parse(). // Example: // flags.FlagSet.StringVar(&myvar, "m", "", "Specify a ....") @@ -86,10 +88,15 @@ func Main(ctx context.Context, cancel context.CancelFunc, router *mux.Router) { func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool { lc := bootstrapContainer.LoggingClientFrom(dic.Get) configuration := container.ConfigurationFrom(dic.Get) - router := messaging.NewMessagingRouter() + + requestTimeout, err := time.ParseDuration(configuration.Service.RequestTimeout) + if err != nil { + lc.Errorf("Failed to parse Service.RequestTimeout configuration value: %v", err) + return false + } if configuration.ExternalMQTT.Enabled { - if !handlers.NewExternalMQTT(messaging.OnConnectHandler(router, dic)).BootstrapHandler(ctx, wg, startupTimer, dic) { + if !handlers.NewExternalMQTT(messaging.OnConnectHandler(requestTimeout, dic)).BootstrapHandler(ctx, wg, startupTimer, dic) { return false } } @@ -97,14 +104,11 @@ func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupT if !handlers.MessagingBootstrapHandler(ctx, wg, startupTimer, dic) { return false } - if err := messaging.SubscribeCommandRequests(ctx, router, dic); err != nil { + if err := messaging.SubscribeCommandRequests(ctx, requestTimeout, dic); err != nil { lc.Errorf("Failed to subscribe commands request from internal message bus, %v", err) return false } - if err := messaging.SubscribeCommandResponses(ctx, router, dic); err != nil { - lc.Errorf("Failed to subscribe commands response from internal message bus, %v", err) - return false - } + if err := messaging.SubscribeCommandQueryRequests(ctx, dic); err != nil { lc.Errorf("Failed to subscribe command query request from internal message bus, %v", err) return false