From 0ff1e966c00e60aa97ea4dbf8c7f3de46eb76056 Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Thu, 15 Aug 2024 14:30:59 +0200 Subject: [PATCH] Closing of sockets --- .github/workflows/test.yml | 20 +++--- coap-gateway/test/test.go | 7 ++- .../service/service.go | 13 +--- .../test/provisionHandler.go | 2 +- device-provisioning-service/test/test.go | 62 +++++++++++-------- test/test.go | 11 ++-- 6 files changed, 60 insertions(+), 55 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7d4db4047..c6f9d1dbe 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -28,7 +28,8 @@ jobs: include: # test with check race with coverage and sonarcloud - name: test - cmd: test + cmd: test-device-provisioning-service + args: TEST_COAP_GATEWAY_UDP_ENABLED=false checkRace: "true" coapGateway: log: @@ -36,7 +37,8 @@ jobs: dumpBody: "true" - name: test/cqldb - cmd: test + cmd: test-device-provisioning-service + args: TEST_COAP_GATEWAY_UDP_ENABLED=false checkRace: "true" database: "cqldb" coapGateway: @@ -46,14 +48,16 @@ jobs: # test without check race - name: test/norace - cmd: test + cmd: test-device-provisioning-service + args: TEST_COAP_GATEWAY_UDP_ENABLED=false coapGateway: log: level: "debug" dumpBody: "true" - name: test/norace/cqldb - cmd: test + cmd: test-device-provisioning-service + args: TEST_COAP_GATEWAY_UDP_ENABLED=false database: "cqldb" coapGateway: log: @@ -65,16 +69,16 @@ jobs: # - with ECDSA-SHA256 signature and P384 elliptic curve certificates # - with TEST_LEAD_RESOURCE_TYPE_FILTER, TEST_LEAD_RESOURCE_TYPE_USE_UUID - name: test/norace-384 - cmd: test - args: CERT_TOOL_SIGN_ALG=ECDSA-SHA384 CERT_TOOL_ELLIPTIC_CURVE=P384 TEST_LEAD_RESOURCE_TYPE_FILTER=last TEST_LEAD_RESOURCE_TYPE_USE_UUID=true + cmd: test-device-provisioning-service + args: TEST_COAP_GATEWAY_UDP_ENABLED=false CERT_TOOL_SIGN_ALG=ECDSA-SHA384 CERT_TOOL_ELLIPTIC_CURVE=P384 TEST_LEAD_RESOURCE_TYPE_FILTER=last TEST_LEAD_RESOURCE_TYPE_USE_UUID=true # test # - without check race # - with TEST_LEAD_RESOURCE_TYPE_FILTER, TEST_LEAD_RESOURCE_TYPE_REGEX_FILTER # - with logs from all services - name: test/norace/logs - cmd: test - args: TEST_LEAD_RESOURCE_TYPE_REGEX_FILTER='oic\.wk\.d,^/light/\d+$$' TEST_LEAD_RESOURCE_TYPE_FILTER=first + cmd: test-device-provisioning-service + args: TEST_COAP_GATEWAY_UDP_ENABLED=false TEST_LEAD_RESOURCE_TYPE_REGEX_FILTER='oic\.wk\.d,^/light/\d+$$' TEST_LEAD_RESOURCE_TYPE_FILTER=first coapGateway: log: level: "debug" diff --git a/coap-gateway/test/test.go b/coap-gateway/test/test.go index 206d70111..a25ad8e05 100644 --- a/coap-gateway/test/test.go +++ b/coap-gateway/test/test.go @@ -71,7 +71,7 @@ func SetUp(t require.TestingT) (tearDown func()) { return New(t, MakeConfig(t)) } -func checkForClosedSockets(t require.TestingT, cfg service.Config) { +func checkForClosedSockets(cfg service.Config) error { sockets := make(test.ListenSockets, 0, len(cfg.APIs.COAP.Protocols)) for _, protocol := range cfg.APIs.COAP.Protocols { sockets = append(sockets, test.ListenSocket{ @@ -79,7 +79,7 @@ func checkForClosedSockets(t require.TestingT, cfg service.Config) { Address: cfg.APIs.COAP.Addr, }) } - sockets.CheckForClosedSockets(t) + return sockets.CheckForClosedSockets() } // New creates test coap-gateway. @@ -106,6 +106,7 @@ func New(t require.TestingT, cfg service.Config) func() { err = fileWatcher.Close() require.NoError(t, err) - checkForClosedSockets(t, cfg) + err = checkForClosedSockets(cfg) + require.NoError(t, err) } } diff --git a/device-provisioning-service/service/service.go b/device-provisioning-service/service/service.go index 8c6e6a228..e2373f97c 100644 --- a/device-provisioning-service/service/service.go +++ b/device-provisioning-service/service/service.go @@ -27,7 +27,6 @@ import ( otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client" "github.com/plgd-dev/hub/v2/pkg/opentelemetry/otelcoap" "github.com/plgd-dev/hub/v2/pkg/service" - "github.com/plgd-dev/hub/v2/pkg/sync/task/queue" otelCodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) @@ -36,7 +35,6 @@ type Service struct { config Config ctx context.Context cancel context.CancelFunc - taskQueue *queue.Queue messagePool *pool.Pool linkedHubCache *LinkedHubCache store *mongodb.Store @@ -127,16 +125,10 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg return nil, fmt.Errorf("cannot create open telemetry collector client: %w", err) } otelClient.AddCloseFunc(cancel) - tracerProvider := otelClient.GetTracerProvider() + var closer fn.FuncList closer.AddFunc(otelClient.Close) - queue, err := queue.New(config.TaskQueue) - if err != nil { - closer.Execute() - return nil, fmt.Errorf("cannot create job queue %w", err) - } - closer.AddFunc(queue.Release) - + tracerProvider := otelClient.GetTracerProvider() store, closeStore, err := NewStore(ctx, config.Clients.Storage.MongoDB, fileWatcher, logger, tracerProvider) if err != nil { closer.Execute() @@ -180,7 +172,6 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg s := Service{ config: config, linkedHubCache: linkedHubCache, - taskQueue: queue, ctx: ctx, cancel: cancel, diff --git a/device-provisioning-service/test/provisionHandler.go b/device-provisioning-service/test/provisionHandler.go index 0c23c9af2..63bdd13a5 100644 --- a/device-provisioning-service/test/provisionHandler.go +++ b/device-provisioning-service/test/provisionHandler.go @@ -43,7 +43,7 @@ func (h *RequestHandlerWithDps) IsStarted() bool { func (h *RequestHandlerWithDps) StartDps(opts ...service.Option) { if h.dpsShutdown != nil { - return + panic("dps already started") } h.Logf("start provisioning") h.dpsShutdown = New(h.t, h.dpsCfg, opts...) diff --git a/device-provisioning-service/test/test.go b/device-provisioning-service/test/test.go index 813b6757a..05aadbf69 100644 --- a/device-provisioning-service/test/test.go +++ b/device-provisioning-service/test/test.go @@ -186,6 +186,31 @@ func init() { }, }, } + + if err := checkForClosedSockets(MakeAPIsConfig()); err != nil { + panic(err) + } +} + +func MakeAPIsConfig() service.APIsConfig { + var cfg service.APIsConfig + cfg.COAP.Addr = DPSHost + cfg.COAP.MaxMessageSize = 256 * 1024 + cfg.COAP.MessagePoolSize = 1000 + cfg.COAP.Protocols = []pkgCoapService.Protocol{pkgCoapService.TCP} + if config.DPS_UDP_ENABLED { + cfg.COAP.Protocols = append(cfg.COAP.Protocols, pkgCoapService.UDP) + } + cfg.COAP.InactivityMonitor = &pkgCoapService.InactivityMonitor{ + Timeout: time.Second * 20, + } + cfg.COAP.BlockwiseTransfer.Enabled = config.DPS_UDP_ENABLED + cfg.COAP.BlockwiseTransfer.SZX = "1024" + cfg.HTTP = MakeHTTPConfig() + tlsServerCfg := config.MakeTLSServerConfig() + cfg.COAP.TLS.Embedded.CertFile = tlsServerCfg.CertFile + cfg.COAP.TLS.Embedded.KeyFile = tlsServerCfg.KeyFile + return cfg } func MakeConfig(t require.TestingT) service.Config { @@ -194,27 +219,11 @@ func MakeConfig(t require.TestingT) service.Config { cfg.Log = log.MakeDefaultConfig() cfg.TaskQueue.GoPoolSize = 1600 cfg.TaskQueue.Size = 2 * 1024 * 1024 - cfg.APIs.COAP.Addr = DPSHost - cfg.APIs.COAP.MaxMessageSize = 256 * 1024 - cfg.APIs.COAP.MessagePoolSize = 1000 - cfg.APIs.COAP.Protocols = []pkgCoapService.Protocol{pkgCoapService.TCP} - if config.DPS_UDP_ENABLED { - cfg.APIs.COAP.Protocols = append(cfg.APIs.COAP.Protocols, pkgCoapService.UDP) - } - cfg.APIs.COAP.InactivityMonitor = &pkgCoapService.InactivityMonitor{ - Timeout: time.Second * 20, - } - cfg.APIs.COAP.BlockwiseTransfer.Enabled = config.DPS_UDP_ENABLED - cfg.APIs.COAP.BlockwiseTransfer.SZX = "1024" - cfg.APIs.HTTP = MakeHTTPConfig() - tlsServerCfg := config.MakeTLSServerConfig() - cfg.APIs.COAP.TLS.Embedded.CertFile = tlsServerCfg.CertFile - cfg.APIs.COAP.TLS.Embedded.KeyFile = tlsServerCfg.KeyFile + cfg.APIs = MakeAPIsConfig() cfg.Clients.Storage = MakeStorageConfig() cfg.Clients.OpenTelemetryCollector = pkgHttp.OpenTelemetryCollectorConfig{ Config: config.MakeOpenTelemetryCollectorClient(), } - cfg.EnrollmentGroups = append(cfg.EnrollmentGroups, MakeEnrollmentGroup()) err := cfg.Validate() require.NoError(t, err) @@ -262,21 +271,21 @@ func New(t *testing.T, cfg service.Config, opts ...service.Option) func() { return NewWithContext(context.Background(), t, cfg, opts...) } -func checkForClosedSockets(t require.TestingT, cfg service.Config) { - sockets := make(hubTest.ListenSockets, 0, len(cfg.APIs.COAP.Protocols)+1) - for _, protocol := range cfg.APIs.COAP.Protocols { +func checkForClosedSockets(cfg service.APIsConfig) error { + sockets := make(hubTest.ListenSockets, 0, len(cfg.COAP.Protocols)+1) + for _, protocol := range cfg.COAP.Protocols { sockets = append(sockets, hubTest.ListenSocket{ Network: string(protocol), - Address: cfg.APIs.COAP.Addr, + Address: cfg.COAP.Addr, }) } - if cfg.APIs.HTTP.Enabled { + if cfg.HTTP.Enabled { sockets = append(sockets, hubTest.ListenSocket{ Network: "tcp", - Address: cfg.APIs.HTTP.Config.Connection.Addr, + Address: cfg.HTTP.Config.Connection.Addr, }) } - sockets.CheckForClosedSockets(t) + return sockets.CheckForClosedSockets() } // New creates test dps-gateway. @@ -302,9 +311,8 @@ func NewWithContext(ctx context.Context, t *testing.T, cfg service.Config, opts err = fileWatcher.Close() require.NoError(t, err) - checkForClosedSockets(t, cfg) - // wait for all connections to be closed - time.Sleep(time.Millisecond * 500) + err = checkForClosedSockets(cfg.APIs) + require.NoError(t, err) } } diff --git a/test/test.go b/test/test.go index 343a24aed..bb80b45c2 100644 --- a/test/test.go +++ b/test/test.go @@ -1012,7 +1012,6 @@ func (ls *ListenSocket) IsClosed() (bool, error) { } c, err := net.ListenUDP(ls.Network, addr) if err != nil { - fmt.Printf("ListenUDP error: %v\n", err) return false, nil } err = c.Close() @@ -1028,7 +1027,6 @@ func (ls *ListenSocket) IsClosed() (bool, error) { } c, err := net.ListenTCP(ls.Network, addr) if err != nil { - fmt.Printf("ListenTCP error: %v\n", err) return false, nil } err = c.Close() @@ -1040,7 +1038,7 @@ func (ls *ListenSocket) IsClosed() (bool, error) { type ListenSockets []ListenSocket -func (ls ListenSockets) CheckForClosedSockets(t require.TestingT) { +func (ls ListenSockets) CheckForClosedSockets() error { // wait for all sockets to be closed - max 3 minutes = 900*200 socketClosed := make([]bool, len(ls)) for j := 0; j < 900; j++ { @@ -1050,7 +1048,9 @@ func (ls ListenSockets) CheckForClosedSockets(t require.TestingT) { continue } closed, err := socket.IsClosed() - require.NoError(t, err) + if err != nil { + return err + } socketClosed[i] = closed if socketClosed[i] { continue @@ -1058,8 +1058,9 @@ func (ls ListenSockets) CheckForClosedSockets(t require.TestingT) { allClosed = false } if allClosed { - break + return nil } time.Sleep(time.Millisecond * 200) } + return errors.New("ports not closed") }