diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7d4db4047..c6f9d1dbe 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -28,7 +28,8 @@ jobs: include: # test with check race with coverage and sonarcloud - name: test - cmd: test + cmd: test-device-provisioning-service + args: TEST_COAP_GATEWAY_UDP_ENABLED=false checkRace: "true" coapGateway: log: @@ -36,7 +37,8 @@ jobs: dumpBody: "true" - name: test/cqldb - cmd: test + cmd: test-device-provisioning-service + args: TEST_COAP_GATEWAY_UDP_ENABLED=false checkRace: "true" database: "cqldb" coapGateway: @@ -46,14 +48,16 @@ jobs: # test without check race - name: test/norace - cmd: test + cmd: test-device-provisioning-service + args: TEST_COAP_GATEWAY_UDP_ENABLED=false coapGateway: log: level: "debug" dumpBody: "true" - name: test/norace/cqldb - cmd: test + cmd: test-device-provisioning-service + args: TEST_COAP_GATEWAY_UDP_ENABLED=false database: "cqldb" coapGateway: log: @@ -65,16 +69,16 @@ jobs: # - with ECDSA-SHA256 signature and P384 elliptic curve certificates # - with TEST_LEAD_RESOURCE_TYPE_FILTER, TEST_LEAD_RESOURCE_TYPE_USE_UUID - name: test/norace-384 - cmd: test - args: CERT_TOOL_SIGN_ALG=ECDSA-SHA384 CERT_TOOL_ELLIPTIC_CURVE=P384 TEST_LEAD_RESOURCE_TYPE_FILTER=last TEST_LEAD_RESOURCE_TYPE_USE_UUID=true + cmd: test-device-provisioning-service + args: TEST_COAP_GATEWAY_UDP_ENABLED=false CERT_TOOL_SIGN_ALG=ECDSA-SHA384 CERT_TOOL_ELLIPTIC_CURVE=P384 TEST_LEAD_RESOURCE_TYPE_FILTER=last TEST_LEAD_RESOURCE_TYPE_USE_UUID=true # test # - without check race # - with TEST_LEAD_RESOURCE_TYPE_FILTER, TEST_LEAD_RESOURCE_TYPE_REGEX_FILTER # - with logs from all services - name: test/norace/logs - cmd: test - args: TEST_LEAD_RESOURCE_TYPE_REGEX_FILTER='oic\.wk\.d,^/light/\d+$$' TEST_LEAD_RESOURCE_TYPE_FILTER=first + cmd: test-device-provisioning-service + args: TEST_COAP_GATEWAY_UDP_ENABLED=false TEST_LEAD_RESOURCE_TYPE_REGEX_FILTER='oic\.wk\.d,^/light/\d+$$' TEST_LEAD_RESOURCE_TYPE_FILTER=first coapGateway: log: level: "debug" diff --git a/coap-gateway/test/test.go b/coap-gateway/test/test.go index 206d70111..a25ad8e05 100644 --- a/coap-gateway/test/test.go +++ b/coap-gateway/test/test.go @@ -71,7 +71,7 @@ func SetUp(t require.TestingT) (tearDown func()) { return New(t, MakeConfig(t)) } -func checkForClosedSockets(t require.TestingT, cfg service.Config) { +func checkForClosedSockets(cfg service.Config) error { sockets := make(test.ListenSockets, 0, len(cfg.APIs.COAP.Protocols)) for _, protocol := range cfg.APIs.COAP.Protocols { sockets = append(sockets, test.ListenSocket{ @@ -79,7 +79,7 @@ func checkForClosedSockets(t require.TestingT, cfg service.Config) { Address: cfg.APIs.COAP.Addr, }) } - sockets.CheckForClosedSockets(t) + return sockets.CheckForClosedSockets() } // New creates test coap-gateway. @@ -106,6 +106,7 @@ func New(t require.TestingT, cfg service.Config) func() { err = fileWatcher.Close() require.NoError(t, err) - checkForClosedSockets(t, cfg) + err = checkForClosedSockets(cfg) + require.NoError(t, err) } } diff --git a/device-provisioning-service/service/provision_test.go b/device-provisioning-service/service/provision_test.go index dd489f31f..b36fe054a 100644 --- a/device-provisioning-service/service/provision_test.go +++ b/device-provisioning-service/service/provision_test.go @@ -323,7 +323,7 @@ func TestProvisioningWithPSK(t *testing.T) { hubTestService.SetUpServicesId|hubTestService.SetUpServicesResourceAggregate|hubTestService.SetUpServicesGrpcGateway) defer hubShutdown() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3600) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() ctx = pkgGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) diff --git a/device-provisioning-service/service/service.go b/device-provisioning-service/service/service.go index 8c6e6a228..001d70118 100644 --- a/device-provisioning-service/service/service.go +++ b/device-provisioning-service/service/service.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net" + "os/exec" "time" "github.com/pion/dtls/v2" @@ -27,7 +28,6 @@ import ( otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client" "github.com/plgd-dev/hub/v2/pkg/opentelemetry/otelcoap" "github.com/plgd-dev/hub/v2/pkg/service" - "github.com/plgd-dev/hub/v2/pkg/sync/task/queue" otelCodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) @@ -36,7 +36,6 @@ type Service struct { config Config ctx context.Context cancel context.CancelFunc - taskQueue *queue.Queue messagePool *pool.Pool linkedHubCache *LinkedHubCache store *mongodb.Store @@ -127,16 +126,10 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg return nil, fmt.Errorf("cannot create open telemetry collector client: %w", err) } otelClient.AddCloseFunc(cancel) - tracerProvider := otelClient.GetTracerProvider() + var closer fn.FuncList closer.AddFunc(otelClient.Close) - queue, err := queue.New(config.TaskQueue) - if err != nil { - closer.Execute() - return nil, fmt.Errorf("cannot create job queue %w", err) - } - closer.AddFunc(queue.Release) - + tracerProvider := otelClient.GetTracerProvider() store, closeStore, err := NewStore(ctx, config.Clients.Storage.MongoDB, fileWatcher, logger, tracerProvider) if err != nil { closer.Execute() @@ -180,7 +173,6 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg s := Service{ config: config, linkedHubCache: linkedHubCache, - taskQueue: queue, ctx: ctx, cancel: cancel, @@ -194,6 +186,12 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg enrollmentGroupsCache: enrollmentGroupsCache, } + cmd := exec.Command("netstat", "-tulpn") + stdout, err := cmd.Output() + if err == nil { + fmt.Println(string(stdout)) + } + ss, err := s.createServices(fileWatcher, logger) if err != nil { if httpService != nil { diff --git a/device-provisioning-service/test/provisionHandler.go b/device-provisioning-service/test/provisionHandler.go index 0c23c9af2..63bdd13a5 100644 --- a/device-provisioning-service/test/provisionHandler.go +++ b/device-provisioning-service/test/provisionHandler.go @@ -43,7 +43,7 @@ func (h *RequestHandlerWithDps) IsStarted() bool { func (h *RequestHandlerWithDps) StartDps(opts ...service.Option) { if h.dpsShutdown != nil { - return + panic("dps already started") } h.Logf("start provisioning") h.dpsShutdown = New(h.t, h.dpsCfg, opts...) diff --git a/device-provisioning-service/test/test.go b/device-provisioning-service/test/test.go index 813b6757a..05aadbf69 100644 --- a/device-provisioning-service/test/test.go +++ b/device-provisioning-service/test/test.go @@ -186,6 +186,31 @@ func init() { }, }, } + + if err := checkForClosedSockets(MakeAPIsConfig()); err != nil { + panic(err) + } +} + +func MakeAPIsConfig() service.APIsConfig { + var cfg service.APIsConfig + cfg.COAP.Addr = DPSHost + cfg.COAP.MaxMessageSize = 256 * 1024 + cfg.COAP.MessagePoolSize = 1000 + cfg.COAP.Protocols = []pkgCoapService.Protocol{pkgCoapService.TCP} + if config.DPS_UDP_ENABLED { + cfg.COAP.Protocols = append(cfg.COAP.Protocols, pkgCoapService.UDP) + } + cfg.COAP.InactivityMonitor = &pkgCoapService.InactivityMonitor{ + Timeout: time.Second * 20, + } + cfg.COAP.BlockwiseTransfer.Enabled = config.DPS_UDP_ENABLED + cfg.COAP.BlockwiseTransfer.SZX = "1024" + cfg.HTTP = MakeHTTPConfig() + tlsServerCfg := config.MakeTLSServerConfig() + cfg.COAP.TLS.Embedded.CertFile = tlsServerCfg.CertFile + cfg.COAP.TLS.Embedded.KeyFile = tlsServerCfg.KeyFile + return cfg } func MakeConfig(t require.TestingT) service.Config { @@ -194,27 +219,11 @@ func MakeConfig(t require.TestingT) service.Config { cfg.Log = log.MakeDefaultConfig() cfg.TaskQueue.GoPoolSize = 1600 cfg.TaskQueue.Size = 2 * 1024 * 1024 - cfg.APIs.COAP.Addr = DPSHost - cfg.APIs.COAP.MaxMessageSize = 256 * 1024 - cfg.APIs.COAP.MessagePoolSize = 1000 - cfg.APIs.COAP.Protocols = []pkgCoapService.Protocol{pkgCoapService.TCP} - if config.DPS_UDP_ENABLED { - cfg.APIs.COAP.Protocols = append(cfg.APIs.COAP.Protocols, pkgCoapService.UDP) - } - cfg.APIs.COAP.InactivityMonitor = &pkgCoapService.InactivityMonitor{ - Timeout: time.Second * 20, - } - cfg.APIs.COAP.BlockwiseTransfer.Enabled = config.DPS_UDP_ENABLED - cfg.APIs.COAP.BlockwiseTransfer.SZX = "1024" - cfg.APIs.HTTP = MakeHTTPConfig() - tlsServerCfg := config.MakeTLSServerConfig() - cfg.APIs.COAP.TLS.Embedded.CertFile = tlsServerCfg.CertFile - cfg.APIs.COAP.TLS.Embedded.KeyFile = tlsServerCfg.KeyFile + cfg.APIs = MakeAPIsConfig() cfg.Clients.Storage = MakeStorageConfig() cfg.Clients.OpenTelemetryCollector = pkgHttp.OpenTelemetryCollectorConfig{ Config: config.MakeOpenTelemetryCollectorClient(), } - cfg.EnrollmentGroups = append(cfg.EnrollmentGroups, MakeEnrollmentGroup()) err := cfg.Validate() require.NoError(t, err) @@ -262,21 +271,21 @@ func New(t *testing.T, cfg service.Config, opts ...service.Option) func() { return NewWithContext(context.Background(), t, cfg, opts...) } -func checkForClosedSockets(t require.TestingT, cfg service.Config) { - sockets := make(hubTest.ListenSockets, 0, len(cfg.APIs.COAP.Protocols)+1) - for _, protocol := range cfg.APIs.COAP.Protocols { +func checkForClosedSockets(cfg service.APIsConfig) error { + sockets := make(hubTest.ListenSockets, 0, len(cfg.COAP.Protocols)+1) + for _, protocol := range cfg.COAP.Protocols { sockets = append(sockets, hubTest.ListenSocket{ Network: string(protocol), - Address: cfg.APIs.COAP.Addr, + Address: cfg.COAP.Addr, }) } - if cfg.APIs.HTTP.Enabled { + if cfg.HTTP.Enabled { sockets = append(sockets, hubTest.ListenSocket{ Network: "tcp", - Address: cfg.APIs.HTTP.Config.Connection.Addr, + Address: cfg.HTTP.Config.Connection.Addr, }) } - sockets.CheckForClosedSockets(t) + return sockets.CheckForClosedSockets() } // New creates test dps-gateway. @@ -302,9 +311,8 @@ func NewWithContext(ctx context.Context, t *testing.T, cfg service.Config, opts err = fileWatcher.Close() require.NoError(t, err) - checkForClosedSockets(t, cfg) - // wait for all connections to be closed - time.Sleep(time.Millisecond * 500) + err = checkForClosedSockets(cfg.APIs) + require.NoError(t, err) } } diff --git a/pkg/net/coap/service/tcpServer.go b/pkg/net/coap/service/tcpServer.go index 96a9439a8..a8dd765ee 100644 --- a/pkg/net/coap/service/tcpServer.go +++ b/pkg/net/coap/service/tcpServer.go @@ -77,6 +77,7 @@ func newTCPServer(config Config, serviceOpts Options, fileWatcher *fsnotify.Watc if err != nil { return nil, fmt.Errorf("cannot create listener: %w", err) } + fmt.Printf("tcp listerer(%v) opened\n", config.Addr) tcpOpts := make([]coapTcpServer.Option, 0, 3) if serviceOpts.OnNewConnection != nil { tcpOpts = append(tcpOpts, options.WithOnNewConn(func(cc *coapTcpClient.Conn) { diff --git a/pkg/net/coap/service/udpServer.go b/pkg/net/coap/service/udpServer.go index d199a4c85..e39c13824 100644 --- a/pkg/net/coap/service/udpServer.go +++ b/pkg/net/coap/service/udpServer.go @@ -141,6 +141,7 @@ func newDTLSServer(config Config, serviceOpts Options, fileWatcher *fsnotify.Wat if err != nil { return nil, fmt.Errorf("cannot create listener: %w", err) } + fmt.Printf("dtls listerer(%v) opened\n", config.Addr) dtlsOpts := make([]coapDtlsServer.Option, 0, 4) if serviceOpts.OnNewConnection != nil { dtlsOpts = append(dtlsOpts, options.WithOnNewConn(func(coapConn *coapUdpClient.Conn) { @@ -177,6 +178,7 @@ func newUDPServer(config Config, serviceOpts Options, logger log.Logger, opts .. if err != nil { return nil, fmt.Errorf("cannot create listener: %w", err) } + fmt.Printf("udp listerer(%v) opened\n", config.Addr) udpOpts := make([]coapUdpServer.Option, 0, 4) if serviceOpts.OnNewConnection != nil { udpOpts = append(udpOpts, options.WithOnNewConn(func(coapConn *coapUdpClient.Conn) { diff --git a/test/test.go b/test/test.go index 343a24aed..bb80b45c2 100644 --- a/test/test.go +++ b/test/test.go @@ -1012,7 +1012,6 @@ func (ls *ListenSocket) IsClosed() (bool, error) { } c, err := net.ListenUDP(ls.Network, addr) if err != nil { - fmt.Printf("ListenUDP error: %v\n", err) return false, nil } err = c.Close() @@ -1028,7 +1027,6 @@ func (ls *ListenSocket) IsClosed() (bool, error) { } c, err := net.ListenTCP(ls.Network, addr) if err != nil { - fmt.Printf("ListenTCP error: %v\n", err) return false, nil } err = c.Close() @@ -1040,7 +1038,7 @@ func (ls *ListenSocket) IsClosed() (bool, error) { type ListenSockets []ListenSocket -func (ls ListenSockets) CheckForClosedSockets(t require.TestingT) { +func (ls ListenSockets) CheckForClosedSockets() error { // wait for all sockets to be closed - max 3 minutes = 900*200 socketClosed := make([]bool, len(ls)) for j := 0; j < 900; j++ { @@ -1050,7 +1048,9 @@ func (ls ListenSockets) CheckForClosedSockets(t require.TestingT) { continue } closed, err := socket.IsClosed() - require.NoError(t, err) + if err != nil { + return err + } socketClosed[i] = closed if socketClosed[i] { continue @@ -1058,8 +1058,9 @@ func (ls ListenSockets) CheckForClosedSockets(t require.TestingT) { allClosed = false } if allClosed { - break + return nil } time.Sleep(time.Millisecond * 200) } + return errors.New("ports not closed") }