diff --git a/NOTICE.md b/NOTICE.md index c2c00f8c..8c6fb165 100644 --- a/NOTICE.md +++ b/NOTICE.md @@ -58,6 +58,42 @@ apache/qpid-proton (0.36.0) * Project: https://github.com/apache/qpid-proton * Source: https://github.com/apache/qpid-proton/tree/0.36.0 +github.com/caarlos0/env (6.10.1) + +* License: MIT License +* Project: https://github.com/caarlos0/env +* Source: https://github.com/caarlos0/env/releases/tag/v6.10.1 + +github.com/stretchr/testify (1.8.1) + +* License: MIT License +* Project: https://github.com/stretchr/testify +* Source: https://github.com/stretchr/testify/releases/tag/v1.8.1 + +eclipse/paho.mqtt.golang (1.4.1) + +* License: Eclipse Distribution License v1.0 +* Project: https://github.com/eclipse/paho.mqtt.golang +* Source: https://github.com/eclipse/paho.mqtt.golang/releases/tag/v1.4.1 + +google/uuid (1.3.0) + +* License: BSD 3-Clause "New" or "Revised" License +* Project: https://github.com/google/uuid +* Source: https://github.com/google/uuid/releases/tag/v1.3.0 + +golang.org/x/net (0.0.0-20210405180319-a5a99cb37ef4) + +* License: BSD 3-Clause "New" or "Revised" License +* Project: https://github.com/golang/net +* Source: https://github.com/golang/net/tree/a5a99cb37ef4b68617775ab669177656090ab396 + +eclipse/ditto-clients-golang (0.0.0-20220225085802-cf3b306280d3) + +* License: Eclipse Public License v2.0 +* Project: https://github.com/eclipse/ditto-clients-golang +* Source: https://github.com/eclipse/ditto-clients-golang/tree/cf3b306280d3453473ae8dd65e78f978e2c838ea + ## Cryptography Content may contain encryption software. The country in which you are currently diff --git a/integration/util/config.go b/integration/util/config.go new file mode 100644 index 00000000..d7def707 --- /dev/null +++ b/integration/util/config.go @@ -0,0 +1,36 @@ +// Copyright (c) 2022 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + +package util + +import ( + "time" +) + +// TestConfiguration is a common integration test configuration +type TestConfiguration struct { + LocalBroker string `env:"LOCAL_BROKER" envDefault:"tcp://localhost:1883"` + MqttQuiesceMs int `env:"MQTT_QUIESCE_MS" envDefault:"500"` + MqttAcknowledgeTimeoutMs int `env:"MQTT_ACKNOWLEDGE_TIMEOUT_MS" envDefault:"3000"` + MqttConnectMs int `env:"MQTT_CONNECT_TIMEOUT_MS" envDefault:"30000"` + + DigitalTwinAPIAddress string `env:"DIGITAL_TWIN_API_ADDRESS"` + DigitalTwinAPIUsername string `env:"DIGITAL_TWIN_API_USERNAME" envDefault:"ditto"` + DigitalTwinAPIPassword string `env:"DIGITAL_TWIN_API_PASSWORD" envDefault:"ditto"` + + WsEventTimeoutMs int `env:"WS_EVENT_TIMEOUT_MS" envDefault:"30000"` +} + +// MillisToDuration converts milliseconds to Duration +func MillisToDuration(millis int) time.Duration { + return time.Duration(millis) * time.Millisecond +} diff --git a/integration/util/go.mod b/integration/util/go.mod new file mode 100644 index 00000000..87e530f3 --- /dev/null +++ b/integration/util/go.mod @@ -0,0 +1,20 @@ +module github.com/eclipse-kanto/kanto/integration/util + +go 1.17 + +require ( + github.com/caarlos0/env/v6 v6.10.1 + github.com/eclipse/ditto-clients-golang v0.0.0-20220225085802-cf3b306280d3 + github.com/eclipse/paho.mqtt.golang v1.4.1 + github.com/google/uuid v1.3.0 + github.com/stretchr/testify v1.8.1 + golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gorilla/websocket v1.4.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/integration/util/go.sum b/integration/util/go.sum new file mode 100644 index 00000000..e1e78a4a --- /dev/null +++ b/integration/util/go.sum @@ -0,0 +1,58 @@ +github.com/caarlos0/env/v6 v6.10.1 h1:t1mPSxNpei6M5yAeu1qtRdPAK29Nbcf/n3G7x+b3/II= +github.com/caarlos0/env/v6 v6.10.1/go.mod h1:hvp/ryKXKipEkcuYjs9mI4bBCg+UI0Yhgm5Zu0ddvwc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/ditto-clients-golang v0.0.0-20220225085802-cf3b306280d3 h1:bfFGs26yNSfhSi6xmnmykB0jZn1Vu5e1/7JA5Wu5aGc= +github.com/eclipse/ditto-clients-golang v0.0.0-20220225085802-cf3b306280d3/go.mod h1:ey7YwfHSQJsinGkGbgeEgqZA7qJnoB0YiFVTFEY50Jg= +github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= +github.com/eclipse/paho.mqtt.golang v1.4.1 h1:tUSpviiL5G3P9SZZJPC4ZULZJsxQKXxfENpMvdbAXAI= +github.com/eclipse/paho.mqtt.golang v1.4.1/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/integration/util/mqtt_client.go b/integration/util/mqtt_client.go new file mode 100644 index 00000000..47bee740 --- /dev/null +++ b/integration/util/mqtt_client.go @@ -0,0 +1,114 @@ +// Copyright (c) 2022 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + +package util + +import ( + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/google/uuid" + + MQTT "github.com/eclipse/paho.mqtt.golang" +) + +const ( + topicThingCfgRequest = "edge/thing/request" + + topicThingCfgResponse = "edge/thing/response" + + keepAliveTimeout = 20 * time.Second +) + +// NewMQTTClient creates a new MQTT client and connects it to the broker from the test configuration +func NewMQTTClient(cfg *TestConfiguration) (MQTT.Client, error) { + opts := MQTT.NewClientOptions(). + AddBroker(cfg.LocalBroker). + SetClientID(uuid.New().String()). + SetConnectTimeout(MillisToDuration(cfg.MqttConnectMs)). + SetKeepAlive(keepAliveTimeout). + SetCleanSession(true). + SetAutoReconnect(true) + + mqttClient := MQTT.NewClient(opts) + + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { + return nil, token.Error() + } + + return mqttClient, nil +} + +// SendMQTTMessage sends a message to a topic using specified client. The message is serialized to JSON format. +func SendMQTTMessage(cfg *TestConfiguration, client MQTT.Client, topic string, message interface{}) error { + payload, err := json.Marshal(message) + if err != nil { + return err + } + token := client.Publish(topic, 1, false, payload) + timeout := MillisToDuration(cfg.MqttAcknowledgeTimeoutMs) + if !token.WaitTimeout(timeout) { + return errors.New("timeout while sending MQTT message") + } + return token.Error() +} + +// ThingConfiguration represents information about the configured thing +type ThingConfiguration struct { + DeviceID string `json:"deviceId"` + TenantID string `json:"tenantId"` + PolicyID string `json:"policyId"` +} + +// GetThingConfiguration retrieves information about the configured thing +func GetThingConfiguration(cfg *TestConfiguration, mqttClient MQTT.Client) (*ThingConfiguration, error) { + type result struct { + cfg *ThingConfiguration + err error + } + + ch := make(chan result) + + token := mqttClient.Subscribe(topicThingCfgResponse, 1, func(client MQTT.Client, message MQTT.Message) { + var cfg ThingConfiguration + if err := json.Unmarshal(message.Payload(), &cfg); err != nil { + ch <- result{nil, err} + } + ch <- result{&cfg, nil} + }) + if !token.WaitTimeout(MillisToDuration(cfg.MqttAcknowledgeTimeoutMs)) { + return nil, errors.New("timeout subscribing to thing configuration response") + } + if token.Error() != nil { + return nil, token.Error() + } + + defer mqttClient.Unsubscribe(topicThingCfgResponse) + + token = mqttClient.Publish(topicThingCfgRequest, 1, false, "") + if !token.WaitTimeout(MillisToDuration(cfg.MqttAcknowledgeTimeoutMs)) { + return nil, errors.New("timeout publishing thing configuration request") + } + if token.Error() != nil { + return nil, token.Error() + } + + timeout := 5 * time.Second + select { + case result := <-ch: + return result.cfg, result.err + case <-time.After(timeout): + return nil, fmt.Errorf("thing config not received in %v", timeout) + } +} diff --git a/integration/util/suite.go b/integration/util/suite.go new file mode 100644 index 00000000..e9ecd1c0 --- /dev/null +++ b/integration/util/suite.go @@ -0,0 +1,66 @@ +// Copyright (c) 2022 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + +package util + +import ( + "testing" + + "github.com/caarlos0/env/v6" + + "github.com/eclipse/ditto-clients-golang" + + MQTT "github.com/eclipse/paho.mqtt.golang" + + "github.com/stretchr/testify/require" +) + +// SuiteInitializer is testify Suite initialization helper +type SuiteInitializer struct { + Cfg *TestConfiguration + + DittoClient *ditto.Client + MQTTClient MQTT.Client +} + +// Setup establishes connections to the local MQTT broker and Ditto +func (suite *SuiteInitializer) Setup(t *testing.T) { + cfg := &TestConfiguration{} + + opts := env.Options{RequiredIfNoDef: true} + require.NoError(t, env.Parse(cfg, opts), "failed to process environment variables") + + t.Logf("%#v\n", cfg) + + mqttClient, err := NewMQTTClient(cfg) + require.NoError(t, err, "connect to MQTT broker") + + dittoClient, err := ditto.NewClientMQTT(mqttClient, ditto.NewConfiguration()) + if err == nil { + err = dittoClient.Connect() + } + + if err != nil { + mqttClient.Disconnect(uint(cfg.MqttQuiesceMs)) + require.NoError(t, err, "initialize ditto client") + } + + suite.Cfg = cfg + suite.DittoClient = dittoClient + suite.MQTTClient = mqttClient +} + +// TearDown closes all connections +func (suite *SuiteInitializer) TearDown() { + suite.DittoClient.Disconnect() + suite.MQTTClient.Disconnect(uint(suite.Cfg.MqttQuiesceMs)) +} diff --git a/integration/util/web.go b/integration/util/web.go new file mode 100644 index 00000000..475ca68d --- /dev/null +++ b/integration/util/web.go @@ -0,0 +1,165 @@ +// Copyright (c) 2022 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + +package util + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" + + "github.com/eclipse/ditto-clients-golang/protocol" + "github.com/google/uuid" + "golang.org/x/net/websocket" +) + +// SendDigitalTwinRequest sends а new HTTP request to Ditto REST API +func SendDigitalTwinRequest(cfg *TestConfiguration, method string, url string, body interface{}) ([]byte, error) { + var reqBody io.Reader + + if body != nil { + jsonValue, err := json.Marshal(body) + if err != nil { + return nil, err + } + reqBody = bytes.NewBuffer(jsonValue) + } + + req, err := http.NewRequest(method, url, reqBody) + if err != nil { + return nil, err + } + + if body != nil { + correlationID := uuid.New().String() + req.Header.Add("Content-Type", "application/json") + req.Header.Add("correlation-id", correlationID) + req.Header.Add("response-required", "true") + } + + req.SetBasicAuth(cfg.DigitalTwinAPIUsername, cfg.DigitalTwinAPIPassword) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("%s %s request failed: %s", method, url, resp.Status) + } + + return io.ReadAll(resp.Body) +} + +// NewDigitalTwinWSConnection creates new web socket connection +func NewDigitalTwinWSConnection(cfg *TestConfiguration) (*websocket.Conn, error) { + wsAddress, err := asWSAddress(cfg.DigitalTwinAPIAddress) + if err != nil { + return nil, err + } + + url := fmt.Sprintf("%s/ws/2", wsAddress) + wsCfg, err := websocket.NewConfig(url, cfg.DigitalTwinAPIAddress) + if err != nil { + return nil, err + } + + auth := fmt.Sprintf("%s:%s", cfg.DigitalTwinAPIUsername, cfg.DigitalTwinAPIPassword) + enc := base64.StdEncoding.EncodeToString([]byte(auth)) + wsCfg.Header = http.Header{ + "Authorization": {"Basic " + enc}, + } + + return websocket.DialConfig(wsCfg) +} + +func getPortOrDefault(url *url.URL, defaultPort string) string { + port := url.Port() + if port == "" { + return defaultPort + } + return port +} + +func asWSAddress(address string) (string, error) { + url, err := url.Parse(address) + if err != nil { + return "", err + } + + if url.Scheme == "https" { + return fmt.Sprintf("wss://%s:%s", url.Hostname(), getPortOrDefault(url, "443")), nil + } + + return fmt.Sprintf("ws://%s:%s", url.Hostname(), getPortOrDefault(url, "80")), nil +} + +// WaitForWSMessage polls messages from the web socket connection until specific message is received or timeout expires +func WaitForWSMessage(cfg *TestConfiguration, ws *websocket.Conn, expectedMessage string) error { + deadline := time.Now().Add(MillisToDuration(cfg.WsEventTimeoutMs)) + ws.SetDeadline(deadline) + + var payload []byte + for time.Now().Before(deadline) { + err := websocket.Message.Receive(ws, &payload) + if err != nil { + return fmt.Errorf("error reading from websocket: %v", err) + } + message := strings.TrimSpace(string(payload)) + if message == expectedMessage { + return nil + } + } + + return errors.New("timeout waiting for web socket message") +} + +// ProcessWSMessages polls messages from the web socket connection until specific condition is satisfied or timeout expires +func ProcessWSMessages(cfg *TestConfiguration, ws *websocket.Conn, process func(*protocol.Envelope) (bool, error)) error { + timeout := MillisToDuration(cfg.WsEventTimeoutMs) + deadline := time.Now().Add(timeout) + ws.SetDeadline(deadline) + + var err error + finished := false + + for !finished && time.Now().Before(deadline) { + var payload []byte + webSocketErr := websocket.Message.Receive(ws, &payload) + if webSocketErr != nil { + return fmt.Errorf("error reading from websocket: %v", webSocketErr) + } + + envelope := &protocol.Envelope{} + unmarshalErr := json.Unmarshal(payload, envelope) + if unmarshalErr == nil { + finished, err = process(envelope) + } else { + // Unmarshalling error, the payload is not a JSON of protocol.Envelope + // Ignore the error + } + } + + if !finished { + return fmt.Errorf("not finished, expected WS response not received in %v, last error: %v", timeout, err) + } + + return err +}