Skip to content

Commit

Permalink
Integration test stabilization (dapr#8001)
Browse files Browse the repository at this point in the history
* Integration test stabilization

Use collectT in Eventually for daprd/hotreload/selfhosted/subscriptions/http.go

Signed-off-by: joshvanl <[email protected]>

* Wrap publishing message for app ready in Eventually

Signed-off-by: joshvanl <[email protected]>

* Wrap metric check in Eventually

Signed-off-by: joshvanl <[email protected]>

* More stability

Signed-off-by: joshvanl <[email protected]>

* More eventuallys around metrics checks

Signed-off-by: joshvanl <[email protected]>

* Adds check to wait for subscription to be available

Signed-off-by: joshvanl <[email protected]>

* Increase timeout for placement to become ready

Signed-off-by: joshvanl <[email protected]>

* Lint

Signed-off-by: joshvanl <[email protected]>

* Add eventually around metrics

Signed-off-by: joshvanl <[email protected]>

* Increase timeout for job to be triggered

Signed-off-by: joshvanl <[email protected]>

* Reduce number in idtypes

Signed-off-by: joshvanl <[email protected]>

* Increase timeout for metrics

Signed-off-by: joshvanl <[email protected]>

* Trigger immediately

Signed-off-by: joshvanl <[email protected]>

* Move metrics Get inside Eventually

Signed-off-by: joshvanl <[email protected]>

* Adds closeCh check during channel message send

Signed-off-by: joshvanl <[email protected]>

* Print got output in logline failure

Signed-off-by: joshvanl <[email protected]>

* Fix logline error got output newline

Signed-off-by: joshvanl <[email protected]>

---------

Signed-off-by: joshvanl <[email protected]>
Co-authored-by: Loong Dai <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
  • Loading branch information
3 people authored Sep 2, 2024
1 parent 3ce38af commit 10fb096
Show file tree
Hide file tree
Showing 16 changed files with 95 additions and 57 deletions.
6 changes: 5 additions & 1 deletion pkg/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,13 @@ func (p *Service) ReportDaprStatus(stream placementv1pb.Placement_ReportDaprStat
}

if isActorRuntime {
p.membershipCh <- hostMemberChange{
select {
case p.membershipCh <- hostMemberChange{
cmdType: raft.MemberRemove,
host: raft.DaprHostMember{Name: registeredMemberID, Namespace: namespace},
}:
case <-p.closedCh:
return errors.New("placement service is closed")
}
}

Expand Down
6 changes: 5 additions & 1 deletion tests/integration/framework/process/logline/logline.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package logline

import (
"bufio"
"bytes"
"context"
"errors"
"io"
Expand All @@ -39,6 +40,7 @@ type LogLine struct {
stderr io.ReadCloser
stderrExp io.WriteCloser
stderrLinContains map[string]bool
got bytes.Buffer

outCheck chan map[string]bool
closeCh chan struct{}
Expand Down Expand Up @@ -98,7 +100,7 @@ func (l *LogLine) Cleanup(t *testing.T) {
close(l.closeCh)
for i := 0; i < 2; i++ {
for expLine := range <-l.outCheck {
assert.Fail(t, "expected to log line", expLine)
assert.Fail(t, "expected to log line: "+expLine, l.got.String())
}
}
}
Expand Down Expand Up @@ -134,6 +136,8 @@ func (l *LogLine) checkOut(t *testing.T, ctx context.Context, expLines map[strin
}
require.NoError(t, err)

l.got.Write(append(line, '\n'))

for expLine := range expLines {
if strings.Contains(string(line), expLine) {
delete(expLines, expLine)
Expand Down
20 changes: 10 additions & 10 deletions tests/integration/framework/process/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,19 @@ func (p *Placement) Cleanup(t *testing.T) {
}

func (p *Placement) WaitUntilRunning(t *testing.T, ctx context.Context) {
t.Helper()

client := client.HTTP(t)
assert.Eventually(t, func() bool {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/healthz", p.healthzPort), nil)
if err != nil {
return false
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/healthz", p.healthzPort), nil)
require.NoError(t, err)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
resp, err := client.Do(req)
if err != nil {
return false
//nolint:testifylint
if assert.NoError(c, err) {
defer resp.Body.Close()
assert.Equal(c, http.StatusOK, resp.StatusCode)
}
defer resp.Body.Close()
return http.StatusOK == resp.StatusCode
}, time.Second*5, 10*time.Millisecond)
}, time.Second*10, 10*time.Millisecond)
}

func (p *Placement) ID() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ spec:

i.daprdsNum = 4
i.actorTypesNum = 2
i.actorIDsNum = 25
i.actorIDsNum = 15
i.daprds = make([]*daprd.Daprd, i.daprdsNum)
i.actorDaprds = make([]actordaprd, i.daprdsNum)
procs := make([]process.Interface, i.daprdsNum*2+2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ spec:

require.NoError(t, os.Remove(filepath.Join(h.resDir2, "pubsub.yaml")))
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, h.daprd.GetMetaRegisteredComponents(t, ctx), 1)
assert.Len(c, h.daprd.GetMetaRegisteredComponents(c, ctx), 1)
}, time.Second*5, time.Millisecond*10)
h.sub.ExpectPublishReceive(t, ctx, newReq(h.daprd, "pubsub0", "d"))
h.sub.ExpectPublishError(t, ctx, newReq(h.daprd, "pubsub1", "c"))
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/suite/daprd/jobs/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (a *api) Run(t *testing.T, ctx context.Context) {
body := `{
"schedule": "@every 1s",
"repeats": 10,
"dueTime": "0s",
"data": {
"@type": "type.googleapis.com/google.protobuf.StringValue",
"value": "\"someData\""
Expand Down Expand Up @@ -119,7 +120,7 @@ func (a *api) Run(t *testing.T, ctx context.Context) {
assert.Equal(t, []byte(`"someData"`), bytes.TrimSpace(job.GetData().GetValue()))
assert.Equal(t, "type.googleapis.com/google.protobuf.StringValue", job.GetData().GetTypeUrl())

case <-time.After(time.Second * 3):
case <-time.After(time.Second * 10):
assert.Fail(t, "timed out waiting for triggered job")
}

Expand Down
1 change: 1 addition & 0 deletions tests/integration/suite/daprd/jobs/streaming/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (s *streaming) Run(t *testing.T, ctx context.Context) {
Name: "test",
Schedule: ptr.Of("@every 1s"),
Repeats: ptr.Of(uint32(1)),
DueTime: ptr.Of("0m"),
Data: &anypb.Any{
TypeUrl: "type.googleapis.com/google.type.Expr",
},
Expand Down
15 changes: 10 additions & 5 deletions tests/integration/suite/daprd/metrics/grpc/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -77,8 +78,10 @@ func (b *basic) Run(t *testing.T, ctx context.Context) {
require.NoError(t, err)

// Verify metrics
metrics := b.daprd.Metrics(t, ctx)
assert.Equal(t, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/InvokeService|grpc_server_status:OK"]))
assert.EventuallyWithT(t, func(c *assert.CollectT) {
metrics := b.daprd.Metrics(t, ctx)
assert.Equal(c, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/InvokeService|grpc_server_status:OK"]))
}, time.Second*10, time.Millisecond*10)
})

t.Run("state stores", func(t *testing.T) {
Expand All @@ -99,8 +102,10 @@ func (b *basic) Run(t *testing.T, ctx context.Context) {
require.NoError(t, err)

// Verify metrics
metrics := b.daprd.Metrics(t, ctx)
assert.Equal(t, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/SaveState|grpc_server_status:OK"]))
assert.Equal(t, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/GetState|grpc_server_status:OK"]))
assert.EventuallyWithT(t, func(c *assert.CollectT) {
metrics := b.daprd.Metrics(t, ctx)
assert.Equal(c, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/SaveState|grpc_server_status:OK"]))
assert.Equal(c, 1, int(metrics["dapr_grpc_io_server_completed_rpcs|app_id:myapp|grpc_server_method:/dapr.proto.runtime.v1.Dapr/GetState|grpc_server_status:OK"]))
}, time.Second*10, time.Millisecond*10)
})
}
6 changes: 4 additions & 2 deletions tests/integration/suite/daprd/metrics/http/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func (b *basic) Run(t *testing.T, ctx context.Context) {
b.daprd.HTTPGet2xx(t, ctx, "/v1.0/state/mystore/myvalue")

metrics := b.daprd.Metrics(t, ctx)
assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:/v1.0/state/mystore|status:204"]))
assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/state/mystore|status:200"]))
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:/v1.0/state/mystore|status:204"]))
assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/state/mystore|status:200"]))
}, time.Second*3, time.Millisecond*10)
})
}
29 changes: 18 additions & 11 deletions tests/integration/suite/daprd/metrics/http/cardinality/low.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/http"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -83,26 +84,32 @@ func (l *low) Run(t *testing.T, ctx context.Context) {

t.Run("service invocation", func(t *testing.T) {
l.daprd.HTTPGet2xx(t, ctx, "/v1.0/invoke/myapp/method/hi")
metrics := l.daprd.Metrics(t, ctx)
assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:|status:200"]))
assert.Equal(t, 1, int(metrics["dapr_http_client_completed_count|app_id:myapp|method:GET|path:/dapr/config|status:200"]))
assert.NotContains(t, metrics, "dapr_http_server_response_count|app_id:myapp|method:GET|path:/v1.0/invoke/myapp/method/hi|status:200")
assert.NotContains(t, metrics, "dapr_http_server_response_count|app_id:myapp|method:GET|path:/v1.0/healthz|status:204 1.000000")
assert.EventuallyWithT(t, func(c *assert.CollectT) {
metrics := l.daprd.Metrics(t, ctx)
assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:|status:200"]))
assert.Equal(c, 1, int(metrics["dapr_http_client_completed_count|app_id:myapp|method:GET|path:/dapr/config|status:200"]))
assert.NotContains(c, metrics, "dapr_http_server_response_count|app_id:myapp|method:GET|path:/v1.0/invoke/myapp/method/hi|status:200")
assert.NotContains(c, metrics, "dapr_http_server_response_count|app_id:myapp|method:GET|path:/v1.0/healthz|status:204 1.000000")
}, time.Second*5, time.Millisecond*10)
})

t.Run("state stores", func(t *testing.T) {
body := `[{"key":"myvalue", "value":"hello world"}]`
l.daprd.HTTPPost2xx(t, ctx, "/v1.0/state/mystore", strings.NewReader(body), "content-type", "application/json")
l.daprd.HTTPGet2xx(t, ctx, "/v1.0/state/mystore/myvalue")
metrics := l.daprd.Metrics(t, ctx)
assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:|status:204"]))
assert.Equal(t, 2, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:|status:200"]))
assert.EventuallyWithT(t, func(c *assert.CollectT) {
metrics := l.daprd.Metrics(t, ctx)
assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:|status:204"]))
assert.Equal(c, 2, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:|status:200"]))
}, time.Second*5, time.Millisecond*10)
})

t.Run("actor invocation", func(t *testing.T) {
l.daprd.HTTPPost2xx(t, ctx, "/v1.0/actors/myactortype/myactorid/method/foo", nil, "content-type", "application/json")
metrics := l.daprd.Metrics(t, ctx)
assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:|status:200"]))
assert.NotContains(t, metrics, "method:InvokeActor/myactortype.")
assert.EventuallyWithT(t, func(c *assert.CollectT) {
metrics := l.daprd.Metrics(t, ctx)
assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:POST|path:|status:200"]))
assert.NotContains(c, metrics, "method:InvokeActor/myactortype.")
}, time.Second*5, time.Millisecond*10)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sort"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -74,15 +75,17 @@ func (h *high) Run(t *testing.T, ctx context.Context) {

t.Run("service invocation", func(t *testing.T) {
h.daprd.HTTPGet2xx(t, ctx, "/v1.0/invoke/myapp/method/hi")
metrics := h.daprd.Metrics(t, ctx)
var httpServerLatencyBuckets []float64
for k := range metrics {
if strings.HasPrefix(k, "dapr_http_server_latency_bucket") && strings.Contains(k, "app_id:myapp") && strings.Contains(k, "status:200") {
bucket := getBucketFromKey(t, k)
httpServerLatencyBuckets = append(httpServerLatencyBuckets, bucket)
require.EventuallyWithT(t, func(c *assert.CollectT) {
metrics := h.daprd.Metrics(t, ctx)
for k := range metrics {
if strings.HasPrefix(k, "dapr_http_server_latency_bucket") && strings.Contains(k, "app_id:myapp") && strings.Contains(k, "status:200") {
bucket := getBucketFromKey(t, k)
httpServerLatencyBuckets = append(httpServerLatencyBuckets, bucket)
}
}
}
require.NotEmpty(t, httpServerLatencyBuckets)
assert.NotEmpty(c, httpServerLatencyBuckets)
}, time.Second*5, time.Millisecond*10)
sort.Slice(httpServerLatencyBuckets, func(i, j int) bool { return httpServerLatencyBuckets[i] < httpServerLatencyBuckets[j] })
// default copied from pkg/config/configuration.go:277
defaultLatencyDistribution := []float64{1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1_000, 2_000, 5_000, 10_000, 20_000, 50_000, 100_000}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -69,6 +70,8 @@ func (h *defaultExcludeVerbs) Run(t *testing.T, ctx context.Context) {
t.Run("service invocation - default", func(t *testing.T) {
h.daprd.HTTPGet2xx(t, ctx, "/v1.0/invoke/myapp/method/orders/123")
metrics := h.daprd.Metrics(t, ctx)
assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/invoke/myapp/method/orders/123|status:200"]))
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:GET|path:/v1.0/invoke/myapp/method/orders/123|status:200"]))
}, time.Second*5, time.Millisecond*10)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -71,7 +72,9 @@ func (h *excludeVerbs) Run(t *testing.T, ctx context.Context) {

t.Run("service invocation - exclude http verbs", func(t *testing.T) {
h.daprd.HTTPGet2xx(t, ctx, "/v1.0/invoke/myapp/method/orders/123")
metrics := h.daprd.Metrics(t, ctx)
assert.Equal(t, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:|path:/v1.0/invoke/myapp/method/orders/123|status:200"]))
assert.EventuallyWithT(t, func(c *assert.CollectT) {
metrics := h.daprd.Metrics(t, ctx)
assert.Equal(c, 1, int(metrics["dapr_http_server_request_count|app_id:myapp|method:|path:/v1.0/invoke/myapp/method/orders/123|status:200"]))
}, time.Second*10, time.Millisecond*10)
})
}
27 changes: 15 additions & 12 deletions tests/integration/suite/daprd/pubsub/grpc/appready.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,22 @@ func (a *appready) Run(t *testing.T, ctx context.Context) {
return resp.StatusCode == http.StatusOK
}, time.Second*5, 10*time.Millisecond)

_, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{
PubsubName: "mypubsub",
Topic: "mytopic",
Data: []byte(`{"status": "completed"}`),
})
require.NoError(t, err)
assert.Eventually(t, func() bool {
_, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{
PubsubName: "mypubsub",
Topic: "mytopic",
Data: []byte(`{"status": "completed"}`),
})
require.NoError(t, err)

select {
case resp := <-a.topicChan:
assert.Equal(t, "/myroute", resp)
case <-time.After(time.Second * 10):
assert.Fail(t, "timeout waiting for topic to return")
}
select {
case resp := <-a.topicChan:
assert.Equal(t, "/myroute", resp)
return true
case <-time.After(time.Second):
return false
}
}, time.Second*5, time.Millisecond*10)

// Should stop sending messages to subscribed app when it becomes unhealthy.
a.appHealthy.Store(false)
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/suite/daprd/shutdown/block/app/healthy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/exec"
prochttp "github.com/dapr/dapr/tests/integration/framework/process/http"
"github.com/dapr/dapr/tests/integration/framework/process/logline"
"github.com/dapr/dapr/tests/integration/suite"
Expand Down Expand Up @@ -92,7 +91,7 @@ func (h *healthy) Setup(t *testing.T) []framework.Option {
daprd.WithAppHealthCheckPath("/healthz"),
daprd.WithAppHealthProbeInterval(1),
daprd.WithAppHealthProbeThreshold(1),
daprd.WithExecOptions(exec.WithStdout(h.logline.Stdout())),
daprd.WithLogLineStdout(h.logline),
daprd.WithResourceFiles(`
apiVersion: dapr.io/v1alpha1
kind: Component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func (s *single) Run(t *testing.T, ctx context.Context) {
client := s.daprd.GRPCClient(t, ctx)

assert.Len(t, s.daprd.GetMetaRegisteredComponents(t, ctx), 1)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, s.daprd.GetMetaSubscriptions(t, ctx), 2)
}, time.Second*10, time.Millisecond, 10)

_, err := client.PublishEvent(ctx, &rtv1.PublishEventRequest{
PubsubName: "foo",
Expand Down

0 comments on commit 10fb096

Please sign in to comment.