From 10fb096879a162c8dadd021910d86d53e1381e95 Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Mon, 2 Sep 2024 18:15:16 +0100 Subject: [PATCH] Integration test stabilization (#8001) * Integration test stabilization Use collectT in Eventually for daprd/hotreload/selfhosted/subscriptions/http.go Signed-off-by: joshvanl * Wrap publishing message for app ready in Eventually Signed-off-by: joshvanl * Wrap metric check in Eventually Signed-off-by: joshvanl * More stability Signed-off-by: joshvanl * More eventuallys around metrics checks Signed-off-by: joshvanl * Adds check to wait for subscription to be available Signed-off-by: joshvanl * Increase timeout for placement to become ready Signed-off-by: joshvanl * Lint Signed-off-by: joshvanl * Add eventually around metrics Signed-off-by: joshvanl * Increase timeout for job to be triggered Signed-off-by: joshvanl * Reduce number in idtypes Signed-off-by: joshvanl * Increase timeout for metrics Signed-off-by: joshvanl * Trigger immediately Signed-off-by: joshvanl * Move metrics Get inside Eventually Signed-off-by: joshvanl * Adds closeCh check during channel message send Signed-off-by: joshvanl * Print got output in logline failure Signed-off-by: joshvanl * Fix logline error got output newline Signed-off-by: joshvanl --------- Signed-off-by: joshvanl Co-authored-by: Loong Dai Co-authored-by: Yaron Schneider --- pkg/placement/placement.go | 6 +++- .../framework/process/logline/logline.go | 6 +++- .../framework/process/placement/placement.go | 20 ++++++------- .../actors/reminders/scheduler/idtypes.go | 2 +- .../selfhosted/subscriptions/http.go | 2 +- .../integration/suite/daprd/jobs/http/api.go | 3 +- .../suite/daprd/jobs/streaming/stream.go | 1 + .../suite/daprd/metrics/grpc/basic.go | 15 ++++++---- .../suite/daprd/metrics/http/basic.go | 6 ++-- .../daprd/metrics/http/cardinality/low.go | 29 ++++++++++++------- .../metrics/http/distributionbuckets/high.go | 17 ++++++----- .../metrics/http/excludeverbs/default.go | 5 +++- .../metrics/http/excludeverbs/excludeverbs.go | 7 +++-- .../suite/daprd/pubsub/grpc/appready.go | 27 +++++++++-------- .../suite/daprd/shutdown/block/app/healthy.go | 3 +- .../daprd/shutdown/block/app/pubsub/single.go | 3 ++ 16 files changed, 95 insertions(+), 57 deletions(-) diff --git a/pkg/placement/placement.go b/pkg/placement/placement.go index 202a46e3c56..062b9e72d89 100644 --- a/pkg/placement/placement.go +++ b/pkg/placement/placement.go @@ -359,9 +359,13 @@ func (p *Service) ReportDaprStatus(stream placementv1pb.Placement_ReportDaprStat } if isActorRuntime { - p.membershipCh <- hostMemberChange{ + select { + case p.membershipCh <- hostMemberChange{ cmdType: raft.MemberRemove, host: raft.DaprHostMember{Name: registeredMemberID, Namespace: namespace}, + }: + case <-p.closedCh: + return errors.New("placement service is closed") } } diff --git a/tests/integration/framework/process/logline/logline.go b/tests/integration/framework/process/logline/logline.go index 547519becc0..90ea26a3e5d 100644 --- a/tests/integration/framework/process/logline/logline.go +++ b/tests/integration/framework/process/logline/logline.go @@ -15,6 +15,7 @@ package logline import ( "bufio" + "bytes" "context" "errors" "io" @@ -39,6 +40,7 @@ type LogLine struct { stderr io.ReadCloser stderrExp io.WriteCloser stderrLinContains map[string]bool + got bytes.Buffer outCheck chan map[string]bool closeCh chan struct{} @@ -98,7 +100,7 @@ func (l *LogLine) Cleanup(t *testing.T) { close(l.closeCh) for i := 0; i < 2; i++ { for expLine := range <-l.outCheck { - assert.Fail(t, "expected to log line", expLine) + assert.Fail(t, "expected to log line: "+expLine, l.got.String()) } } } @@ -134,6 +136,8 @@ func (l *LogLine) checkOut(t *testing.T, ctx context.Context, expLines map[strin } require.NoError(t, err) + l.got.Write(append(line, '\n')) + for expLine := range expLines { if strings.Contains(string(line), expLine) { delete(expLines, expLine) diff --git a/tests/integration/framework/process/placement/placement.go b/tests/integration/framework/process/placement/placement.go index 0e1766e791e..5c8bfdfbc8b 100644 --- a/tests/integration/framework/process/placement/placement.go +++ b/tests/integration/framework/process/placement/placement.go @@ -133,19 +133,19 @@ func (p *Placement) Cleanup(t *testing.T) { } func (p *Placement) WaitUntilRunning(t *testing.T, ctx context.Context) { + t.Helper() + client := client.HTTP(t) - assert.Eventually(t, func() bool { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/healthz", p.healthzPort), nil) - if err != nil { - return false - } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/healthz", p.healthzPort), nil) + require.NoError(t, err) + assert.EventuallyWithT(t, func(c *assert.CollectT) { resp, err := client.Do(req) - if err != nil { - return false + //nolint:testifylint + if assert.NoError(c, err) { + defer resp.Body.Close() + assert.Equal(c, http.StatusOK, resp.StatusCode) } - defer resp.Body.Close() - return http.StatusOK == resp.StatusCode - }, time.Second*5, 10*time.Millisecond) + }, time.Second*10, 10*time.Millisecond) } func (p *Placement) ID() string { diff --git a/tests/integration/suite/actors/reminders/scheduler/idtypes.go b/tests/integration/suite/actors/reminders/scheduler/idtypes.go index 0276be731e3..f983f9a6459 100644 --- a/tests/integration/suite/actors/reminders/scheduler/idtypes.go +++ b/tests/integration/suite/actors/reminders/scheduler/idtypes.go @@ -84,7 +84,7 @@ spec: i.daprdsNum = 4 i.actorTypesNum = 2 - i.actorIDsNum = 25 + i.actorIDsNum = 15 i.daprds = make([]*daprd.Daprd, i.daprdsNum) i.actorDaprds = make([]actordaprd, i.daprdsNum) procs := make([]process.Interface, i.daprdsNum*2+2) diff --git a/tests/integration/suite/daprd/hotreload/selfhosted/subscriptions/http.go b/tests/integration/suite/daprd/hotreload/selfhosted/subscriptions/http.go index 0ab51f3cac2..faab3721f77 100644 --- a/tests/integration/suite/daprd/hotreload/selfhosted/subscriptions/http.go +++ b/tests/integration/suite/daprd/hotreload/selfhosted/subscriptions/http.go @@ -283,7 +283,7 @@ spec: require.NoError(t, os.Remove(filepath.Join(h.resDir2, "pubsub.yaml"))) assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Len(c, h.daprd.GetMetaRegisteredComponents(t, ctx), 1) + assert.Len(c, h.daprd.GetMetaRegisteredComponents(c, ctx), 1) }, time.Second*5, time.Millisecond*10) h.sub.ExpectPublishReceive(t, ctx, newReq(h.daprd, "pubsub0", "d")) h.sub.ExpectPublishError(t, ctx, newReq(h.daprd, "pubsub1", "c")) diff --git a/tests/integration/suite/daprd/jobs/http/api.go b/tests/integration/suite/daprd/jobs/http/api.go index dee6241c6cf..24a48f5c6ea 100644 --- a/tests/integration/suite/daprd/jobs/http/api.go +++ b/tests/integration/suite/daprd/jobs/http/api.go @@ -90,6 +90,7 @@ func (a *api) Run(t *testing.T, ctx context.Context) { body := `{ "schedule": "@every 1s", "repeats": 10, +"dueTime": "0s", "data": { "@type": "type.googleapis.com/google.protobuf.StringValue", "value": "\"someData\"" @@ -119,7 +120,7 @@ func (a *api) Run(t *testing.T, ctx context.Context) { assert.Equal(t, []byte(`"someData"`), bytes.TrimSpace(job.GetData().GetValue())) assert.Equal(t, "type.googleapis.com/google.protobuf.StringValue", job.GetData().GetTypeUrl()) - case <-time.After(time.Second * 3): + case <-time.After(time.Second * 10): assert.Fail(t, "timed out waiting for triggered job") } diff --git a/tests/integration/suite/daprd/jobs/streaming/stream.go b/tests/integration/suite/daprd/jobs/streaming/stream.go index 723a569fd87..3e4d691b5c9 100644 --- a/tests/integration/suite/daprd/jobs/streaming/stream.go +++ b/tests/integration/suite/daprd/jobs/streaming/stream.go @@ -116,6 +116,7 @@ func (s *streaming) Run(t *testing.T, ctx context.Context) { Name: "test", Schedule: ptr.Of("@every 1s"), Repeats: ptr.Of(uint32(1)), + DueTime: ptr.Of("0m"), Data: &anypb.Any{ TypeUrl: "type.googleapis.com/google.type.Expr", }, diff --git a/tests/integration/suite/daprd/metrics/grpc/basic.go b/tests/integration/suite/daprd/metrics/grpc/basic.go index fddb2b7101f..e734620cc1f 100644 --- a/tests/integration/suite/daprd/metrics/grpc/basic.go +++ b/tests/integration/suite/daprd/metrics/grpc/basic.go @@ -18,6 +18,7 @@ import ( "fmt" "net/http" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -77,8 +78,10 @@ func (b *basic) Run(t *testing.T, ctx context.Context) { require.NoError(t, err) // Verify metrics - metrics := b.daprd.Metrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/InvokeService|grpc_server_status:OK"])) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + metrics := b.daprd.Metrics(t, ctx) + assert.Equal(c, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/InvokeService|grpc_server_status:OK"])) + }, time.Second*10, time.Millisecond*10) }) t.Run("state stores", func(t *testing.T) { @@ -99,8 +102,10 @@ func (b *basic) Run(t *testing.T, ctx context.Context) { require.NoError(t, err) // Verify metrics - metrics := b.daprd.Metrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/SaveState|grpc_server_status:OK"])) - assert.Equal(t, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/GetState|grpc_server_status:OK"])) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + metrics := b.daprd.Metrics(t, ctx) + assert.Equal(c, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/SaveState|grpc_server_status:OK"])) + assert.Equal(c, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/GetState|grpc_server_status:OK"])) + }, time.Second*10, time.Millisecond*10) }) } diff --git a/tests/integration/suite/daprd/metrics/http/basic.go b/tests/integration/suite/daprd/metrics/http/basic.go index 7349110e83d..ba1358205c2 100644 --- a/tests/integration/suite/daprd/metrics/http/basic.go +++ b/tests/integration/suite/daprd/metrics/http/basic.go @@ -75,7 +75,9 @@ func (b *basic) Run(t *testing.T, ctx context.Context) { b.daprd.HTTPGet2xx(t, ctx, "/v1.0/state/mystore/myvalue") metrics := b.daprd.Metrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:/v1.0/state/mystore|status:204"])) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/state/mystore|status:200"])) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:/v1.0/state/mystore|status:204"])) + assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/state/mystore|status:200"])) + }, time.Second*3, time.Millisecond*10) }) } diff --git a/tests/integration/suite/daprd/metrics/http/cardinality/low.go b/tests/integration/suite/daprd/metrics/http/cardinality/low.go index fbf6e2b9fef..87d1cb58f5a 100644 --- a/tests/integration/suite/daprd/metrics/http/cardinality/low.go +++ b/tests/integration/suite/daprd/metrics/http/cardinality/low.go @@ -19,6 +19,7 @@ import ( "net/http" "strings" "testing" + "time" "github.com/stretchr/testify/assert" @@ -83,26 +84,32 @@ func (l *low) Run(t *testing.T, ctx context.Context) { t.Run("service invocation", func(t *testing.T) { l.daprd.HTTPGet2xx(t, ctx, "/v1.0/invoke/myapp/method/hi") - metrics := l.daprd.Metrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:|status:200"])) - assert.Equal(t, 1, int(metrics["dapr_http_client_completed_count|app_id:myapp|method:GET|path:/dapr/config|status:200"])) - assert.NotContains(t, metrics, "dapr_http_server_response_count|app_id:myapp|method:GET|path:/v1.0/invoke/myapp/method/hi|status:200") - assert.NotContains(t, metrics, "dapr_http_server_response_count|app_id:myapp|method:GET|path:/v1.0/healthz|status:204 1.000000") + assert.EventuallyWithT(t, func(c *assert.CollectT) { + metrics := l.daprd.Metrics(t, ctx) + assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:|status:200"])) + assert.Equal(c, 1, int(metrics["dapr_http_client_completed_count|app_id:myapp|method:GET|path:/dapr/config|status:200"])) + assert.NotContains(c, metrics, "dapr_http_server_response_count|app_id:myapp|method:GET|path:/v1.0/invoke/myapp/method/hi|status:200") + assert.NotContains(c, metrics, "dapr_http_server_response_count|app_id:myapp|method:GET|path:/v1.0/healthz|status:204 1.000000") + }, time.Second*5, time.Millisecond*10) }) t.Run("state stores", func(t *testing.T) { body := `[{"key":"myvalue", "value":"hello world"}]` l.daprd.HTTPPost2xx(t, ctx, "/v1.0/state/mystore", strings.NewReader(body), "content-type", "application/json") l.daprd.HTTPGet2xx(t, ctx, "/v1.0/state/mystore/myvalue") - metrics := l.daprd.Metrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:|status:204"])) - assert.Equal(t, 2, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:|status:200"])) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + metrics := l.daprd.Metrics(t, ctx) + assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:|status:204"])) + assert.Equal(c, 2, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:|status:200"])) + }, time.Second*5, time.Millisecond*10) }) t.Run("actor invocation", func(t *testing.T) { l.daprd.HTTPPost2xx(t, ctx, "/v1.0/actors/myactortype/myactorid/method/foo", nil, "content-type", "application/json") - metrics := l.daprd.Metrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:|status:200"])) - assert.NotContains(t, metrics, "method:InvokeActor/myactortype.") + assert.EventuallyWithT(t, func(c *assert.CollectT) { + metrics := l.daprd.Metrics(t, ctx) + assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:|status:200"])) + assert.NotContains(c, metrics, "method:InvokeActor/myactortype.") + }, time.Second*5, time.Millisecond*10) }) } diff --git a/tests/integration/suite/daprd/metrics/http/distributionbuckets/high.go b/tests/integration/suite/daprd/metrics/http/distributionbuckets/high.go index 281bbb995dd..4d0feced51d 100644 --- a/tests/integration/suite/daprd/metrics/http/distributionbuckets/high.go +++ b/tests/integration/suite/daprd/metrics/http/distributionbuckets/high.go @@ -20,6 +20,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -74,15 +75,17 @@ func (h *high) Run(t *testing.T, ctx context.Context) { t.Run("service invocation", func(t *testing.T) { h.daprd.HTTPGet2xx(t, ctx, "/v1.0/invoke/myapp/method/hi") - metrics := h.daprd.Metrics(t, ctx) var httpServerLatencyBuckets []float64 - for k := range metrics { - if strings.HasPrefix(k, "dapr_http_server_latency_bucket") && strings.Contains(k, "app_id:myapp") && strings.Contains(k, "status:200") { - bucket := getBucketFromKey(t, k) - httpServerLatencyBuckets = append(httpServerLatencyBuckets, bucket) + require.EventuallyWithT(t, func(c *assert.CollectT) { + metrics := h.daprd.Metrics(t, ctx) + for k := range metrics { + if strings.HasPrefix(k, "dapr_http_server_latency_bucket") && strings.Contains(k, "app_id:myapp") && strings.Contains(k, "status:200") { + bucket := getBucketFromKey(t, k) + httpServerLatencyBuckets = append(httpServerLatencyBuckets, bucket) + } } - } - require.NotEmpty(t, httpServerLatencyBuckets) + assert.NotEmpty(c, httpServerLatencyBuckets) + }, time.Second*5, time.Millisecond*10) sort.Slice(httpServerLatencyBuckets, func(i, j int) bool { return httpServerLatencyBuckets[i] < httpServerLatencyBuckets[j] }) // default copied from pkg/config/configuration.go:277 defaultLatencyDistribution := []float64{1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1_000, 2_000, 5_000, 10_000, 20_000, 50_000, 100_000} diff --git a/tests/integration/suite/daprd/metrics/http/excludeverbs/default.go b/tests/integration/suite/daprd/metrics/http/excludeverbs/default.go index 768beabeae9..41b03f92bd3 100644 --- a/tests/integration/suite/daprd/metrics/http/excludeverbs/default.go +++ b/tests/integration/suite/daprd/metrics/http/excludeverbs/default.go @@ -18,6 +18,7 @@ import ( "fmt" "net/http" "testing" + "time" "github.com/stretchr/testify/assert" @@ -69,6 +70,8 @@ func (h *defaultExcludeVerbs) Run(t *testing.T, ctx context.Context) { t.Run("service invocation - default", func(t *testing.T) { h.daprd.HTTPGet2xx(t, ctx, "/v1.0/invoke/myapp/method/orders/123") metrics := h.daprd.Metrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/invoke/myapp/method/orders/123|status:200"])) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/invoke/myapp/method/orders/123|status:200"])) + }, time.Second*5, time.Millisecond*10) }) } diff --git a/tests/integration/suite/daprd/metrics/http/excludeverbs/excludeverbs.go b/tests/integration/suite/daprd/metrics/http/excludeverbs/excludeverbs.go index eb2e3df1b81..69a4982c420 100644 --- a/tests/integration/suite/daprd/metrics/http/excludeverbs/excludeverbs.go +++ b/tests/integration/suite/daprd/metrics/http/excludeverbs/excludeverbs.go @@ -18,6 +18,7 @@ import ( "fmt" "net/http" "testing" + "time" "github.com/stretchr/testify/assert" @@ -71,7 +72,9 @@ func (h *excludeVerbs) Run(t *testing.T, ctx context.Context) { t.Run("service invocation - exclude http verbs", func(t *testing.T) { h.daprd.HTTPGet2xx(t, ctx, "/v1.0/invoke/myapp/method/orders/123") - metrics := h.daprd.Metrics(t, ctx) - assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:|path:/v1.0/invoke/myapp/method/orders/123|status:200"])) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + metrics := h.daprd.Metrics(t, ctx) + assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:|path:/v1.0/invoke/myapp/method/orders/123|status:200"])) + }, time.Second*10, time.Millisecond*10) }) } diff --git a/tests/integration/suite/daprd/pubsub/grpc/appready.go b/tests/integration/suite/daprd/pubsub/grpc/appready.go index 8d280df8b61..8434cf602b7 100644 --- a/tests/integration/suite/daprd/pubsub/grpc/appready.go +++ b/tests/integration/suite/daprd/pubsub/grpc/appready.go @@ -143,19 +143,22 @@ func (a *appready) Run(t *testing.T, ctx context.Context) { return resp.StatusCode == http.StatusOK }, time.Second*5, 10*time.Millisecond) - _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ - PubsubName: "mypubsub", - Topic: "mytopic", - Data: []byte(`{"status": "completed"}`), - }) - require.NoError(t, err) + assert.Eventually(t, func() bool { + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "mypubsub", + Topic: "mytopic", + Data: []byte(`{"status": "completed"}`), + }) + require.NoError(t, err) - select { - case resp := <-a.topicChan: - assert.Equal(t, "/myroute", resp) - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout waiting for topic to return") - } + select { + case resp := <-a.topicChan: + assert.Equal(t, "/myroute", resp) + return true + case <-time.After(time.Second): + return false + } + }, time.Second*5, time.Millisecond*10) // Should stop sending messages to subscribed app when it becomes unhealthy. a.appHealthy.Store(false) diff --git a/tests/integration/suite/daprd/shutdown/block/app/healthy.go b/tests/integration/suite/daprd/shutdown/block/app/healthy.go index cbd019c0a00..85d8c8330fb 100644 --- a/tests/integration/suite/daprd/shutdown/block/app/healthy.go +++ b/tests/integration/suite/daprd/shutdown/block/app/healthy.go @@ -29,7 +29,6 @@ import ( rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" "github.com/dapr/dapr/tests/integration/framework" "github.com/dapr/dapr/tests/integration/framework/process/daprd" - "github.com/dapr/dapr/tests/integration/framework/process/exec" prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" "github.com/dapr/dapr/tests/integration/framework/process/logline" "github.com/dapr/dapr/tests/integration/suite" @@ -92,7 +91,7 @@ func (h *healthy) Setup(t *testing.T) []framework.Option { daprd.WithAppHealthCheckPath("/healthz"), daprd.WithAppHealthProbeInterval(1), daprd.WithAppHealthProbeThreshold(1), - daprd.WithExecOptions(exec.WithStdout(h.logline.Stdout())), + daprd.WithLogLineStdout(h.logline), daprd.WithResourceFiles(` apiVersion: dapr.io/v1alpha1 kind: Component diff --git a/tests/integration/suite/daprd/shutdown/block/app/pubsub/single.go b/tests/integration/suite/daprd/shutdown/block/app/pubsub/single.go index 699f21edbc6..49d8c552bae 100644 --- a/tests/integration/suite/daprd/shutdown/block/app/pubsub/single.go +++ b/tests/integration/suite/daprd/shutdown/block/app/pubsub/single.go @@ -113,6 +113,9 @@ func (s *single) Run(t *testing.T, ctx context.Context) { client := s.daprd.GRPCClient(t, ctx) assert.Len(t, s.daprd.GetMetaRegisteredComponents(t, ctx), 1) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Len(c, s.daprd.GetMetaSubscriptions(t, ctx), 2) + }, time.Second*10, time.Millisecond, 10) _, err := client.PublishEvent(ctx, &rtv1.PublishEventRequest{ PubsubName: "foo",