Skip to content

Commit

Permalink
e2e: add CEL to accesslog test (#3730)
Browse files Browse the repository at this point in the history
* e2e: fix accesslog test

Signed-off-by: zirain <[email protected]>

* refactor test

Signed-off-by: zirain <[email protected]>

* negative test

Signed-off-by: zirain <[email protected]>

* fix match

Signed-off-by: zirain <[email protected]>

---------

Signed-off-by: zirain <[email protected]>
  • Loading branch information
zirain authored Jul 3, 2024
1 parent de8a53d commit 5050e36
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 195 deletions.
2 changes: 2 additions & 0 deletions test/config/gatewayclass.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ spec:
type: Text
text: |
[%START_TIME%] %METADATA(ROUTE:envoy-gateway:resources)% "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%"
matches:
- "'x-envoy-logged' in request.headers"
sinks:
- type: File
file:
Expand Down
290 changes: 95 additions & 195 deletions test/e2e/tests/accesslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,11 @@ package tests

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"testing"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
httputils "sigs.k8s.io/gateway-api/conformance/utils/http"
"sigs.k8s.io/gateway-api/conformance/utils/kubernetes"
"sigs.k8s.io/gateway-api/conformance/utils/suite"
Expand All @@ -37,7 +29,14 @@ var FileAccessLogTest = suite.ConformanceTest{
Description: "Make sure file access log is working",
Manifests: []string{"testdata/accesslog-file.yaml"},
Test: func(t *testing.T, suite *suite.ConformanceTestSuite) {
t.Run("Stdout", func(t *testing.T) {
labels := map[string]string{
"job": "fluentbit",
"k8s_namespace_name": "envoy-gateway-system",
"k8s_container_name": "envoy",
}
match := "test-annotation-value"

t.Run("Positive", func(t *testing.T) {
ns := "gateway-conformance-infra"
routeNN := types.NamespacedName{Name: "accesslog-file", Namespace: ns}
gwNN := types.NamespacedName{Name: "same-namespace", Namespace: ns}
Expand All @@ -46,6 +45,9 @@ var FileAccessLogTest = suite.ConformanceTest{
expectedResponse := httputils.ExpectedResponse{
Request: httputils.Request{
Path: "/file",
Headers: map[string]string{
"x-envoy-logged": "1",
},
},
Response: httputils.Response{
StatusCode: 200,
Expand All @@ -55,70 +57,29 @@ var FileAccessLogTest = suite.ConformanceTest{
// make sure listener is ready
httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

labels := map[string]string{
"job": "fluentbit",
"k8s_namespace_name": "envoy-gateway-system",
"k8s_container_name": "envoy",
}
// let's wait for the log to be sent to stdout
if err := wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Minute, true,
func(ctx context.Context) (bool, error) {
// query log count from loki
count, err := QueryLogCountFromLoki(t, suite.Client, types.NamespacedName{
Namespace: "envoy-gateway-system",
}, labels, "test-annotation-value")
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

if count > 0 {
return true, nil
}
return false, nil
}); err != nil {
t.Errorf("failed to wait log flush to loki: %v", err)
}

if err := wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Minute, true,
func(ctx context.Context) (bool, error) {
// query log count from loki
preCount, err := QueryLogCountFromLoki(t, suite.Client, types.NamespacedName{
Namespace: "envoy-gateway-system",
}, labels, "test-annotation-value")
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

// it will take some time for fluent-bit to collect the log and send to loki
// let's wait for a while
if err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 15*time.Second, true, func(_ context.Context) (bool, error) {
count, err := QueryLogCountFromLoki(t, suite.Client, types.NamespacedName{
Namespace: "envoy-gateway-system",
}, labels, "test-annotation-value")
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

delta := count - preCount
if delta == 1 {
return true, nil
}
runLogTest(t, suite, gwAddr, expectedResponse, labels, match, 1)
})

t.Logf("preCount=%d, count=%d", preCount, count)
return false, nil
}); err != nil {
return false, nil
}
t.Run("Negative", func(t *testing.T) {
ns := "gateway-conformance-infra"
routeNN := types.NamespacedName{Name: "accesslog-file", Namespace: ns}
gwNN := types.NamespacedName{Name: "same-namespace", Namespace: ns}
gwAddr := kubernetes.GatewayAndHTTPRoutesMustBeAccepted(t, suite.Client, suite.TimeoutConfig, suite.ControllerName, kubernetes.NewGatewayRef(gwNN), routeNN)

return true, nil
}); err != nil {
t.Errorf("failed to get log count from loki: %v", err)
expectedResponse := httputils.ExpectedResponse{
Request: httputils.Request{
Path: "/file",
// envoy will not log this request without the header x-envoy-logged
},
Response: httputils.Response{
StatusCode: 200,
},
Namespace: ns,
}
// make sure listener is ready
httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

runLogTest(t, suite, gwAddr, expectedResponse, labels, match, 0)
})
},
}
Expand All @@ -128,7 +89,12 @@ var OpenTelemetryTest = suite.ConformanceTest{
Description: "Make sure OpenTelemetry access log is working",
Manifests: []string{"testdata/accesslog-otel.yaml"},
Test: func(t *testing.T, suite *suite.ConformanceTestSuite) {
t.Run("OTel", func(t *testing.T) {
labels := map[string]string{
"k8s_namespace_name": "envoy-gateway-system",
"exporter": "OTLP",
}

t.Run("Positive", func(t *testing.T) {
ns := "gateway-conformance-infra"
routeNN := types.NamespacedName{Name: "accesslog-otel", Namespace: ns}
gwNN := types.NamespacedName{Name: "same-namespace", Namespace: ns}
Expand All @@ -137,6 +103,9 @@ var OpenTelemetryTest = suite.ConformanceTest{
expectedResponse := httputils.ExpectedResponse{
Request: httputils.Request{
Path: "/otel",
Headers: map[string]string{
"x-envoy-logged": "1",
},
},
Response: httputils.Response{
StatusCode: 200,
Expand All @@ -146,47 +115,29 @@ var OpenTelemetryTest = suite.ConformanceTest{
// make sure listener is ready
httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

labels := map[string]string{
"k8s_namespace_name": "envoy-gateway-system",
"exporter": "OTLP",
}
if err := wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Minute, true,
func(ctx context.Context) (bool, error) {
// query log count from loki
preCount, err := QueryLogCountFromLoki(t, suite.Client, types.NamespacedName{
Namespace: "envoy-gateway-system",
}, labels, "")
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

if err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 10*time.Second, true, func(_ context.Context) (bool, error) {
count, err := QueryLogCountFromLoki(t, suite.Client, types.NamespacedName{
Namespace: "envoy-gateway-system",
}, labels, "")
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

delta := count - preCount
if delta == 1 {
return true, nil
}
runLogTest(t, suite, gwAddr, expectedResponse, labels, "", 1)
})

t.Logf("preCount=%d, count=%d", preCount, count)
return false, nil
}); err != nil {
return false, nil
}
t.Run("Negative", func(t *testing.T) {
ns := "gateway-conformance-infra"
routeNN := types.NamespacedName{Name: "accesslog-otel", Namespace: ns}
gwNN := types.NamespacedName{Name: "same-namespace", Namespace: ns}
gwAddr := kubernetes.GatewayAndHTTPRoutesMustBeAccepted(t, suite.Client, suite.TimeoutConfig, suite.ControllerName, kubernetes.NewGatewayRef(gwNN), routeNN)

return true, nil
}); err != nil {
t.Errorf("failed to get log count from loki: %v", err)
expectedResponse := httputils.ExpectedResponse{
Request: httputils.Request{
Path: "/otel",
// envoy will not log this request without the header x-envoy-logged
},
Response: httputils.Response{
StatusCode: 200,
},
Namespace: ns,
}
// make sure listener is ready
httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

runLogTest(t, suite, gwAddr, expectedResponse, labels, "", 0)
})
},
}
Expand All @@ -205,6 +156,9 @@ var ALSTest = suite.ConformanceTest{
expectedResponse := httputils.ExpectedResponse{
Request: httputils.Request{
Path: "/als",
Headers: map[string]string{
"x-envoy-logged": "1",
},
},
Response: httputils.Response{
StatusCode: 200,
Expand All @@ -225,96 +179,42 @@ var ALSTest = suite.ConformanceTest{
},
}

func ALSLogCount(t *testing.T, suite *suite.ConformanceTestSuite) int {
metricPath, err := RetrieveURL(suite.Client, types.NamespacedName{
Namespace: "monitoring",
Name: "envoy-als",
}, 19001, "/metrics")
if err != nil {
t.Fatalf("failed to get metric url: %v", err)
}

countMetric, err := RetrieveMetric(metricPath, "log_count", time.Second)
if err != nil {
t.Fatalf("failed to get metric: %v", err)
}

total := 0
for _, m := range countMetric.Metric {
if m.Counter != nil && m.Counter.Value != nil {
total += int(*m.Counter.Value)
}
}

return total
}

// QueryLogCountFromLoki queries log count from loki
// TODO: move to utils package if needed
func QueryLogCountFromLoki(t *testing.T, c client.Client, nn types.NamespacedName, keyValues map[string]string, match string) (int, error) {
svc := corev1.Service{}
if err := c.Get(context.Background(), types.NamespacedName{
Namespace: "monitoring",
Name: "loki",
}, &svc); err != nil {
return -1, err
}
lokiHost := ""
for _, ing := range svc.Status.LoadBalancer.Ingress {
if ing.IP != "" {
lokiHost = ing.IP
break
}
}

qParams := make([]string, 0, len(keyValues))
for k, v := range keyValues {
qParams = append(qParams, fmt.Sprintf("%s=\"%s\"", k, v))
}

q := "{" + strings.Join(qParams, ",") + "}"
if match != "" {
q = q + "|~\"" + match + "\""
}
params := url.Values{}
params.Add("query", q)
params.Add("start", fmt.Sprintf("%d", time.Now().Add(-10*time.Minute).Unix())) // query logs from last 10 minutes
lokiQueryURL := fmt.Sprintf("http://%s:3100/loki/api/v1/query_range?%s", lokiHost, params.Encode())
res, err := http.DefaultClient.Get(lokiQueryURL)
if err != nil {
return -1, err
}
t.Logf("get response from loki, query=%s, status=%s", q, res.Status)
func runLogTest(t *testing.T, suite *suite.ConformanceTestSuite, gwAddr string,
expectedResponse httputils.ExpectedResponse, expectedLabels map[string]string, expectedMatch string, expectedDelta int,
) {
if err := wait.PollUntilContextTimeout(context.TODO(), time.Second, time.Minute, true,
func(ctx context.Context) (bool, error) {
// query log count from loki
preCount, err := QueryLogCountFromLoki(t, suite.Client, expectedLabels, expectedMatch)
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

b, err := io.ReadAll(res.Body)
if err != nil {
return -1, err
}
httputils.MakeRequestAndExpectEventuallyConsistentResponse(t, suite.RoundTripper, suite.TimeoutConfig, gwAddr, expectedResponse)

lokiResponse := &LokiQueryResponse{}
if err := json.Unmarshal(b, lokiResponse); err != nil {
return -1, err
}
// it will take some time for fluent-bit to collect the log and send to loki
// let's wait for a while
if err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 15*time.Second, true, func(_ context.Context) (bool, error) {
count, err := QueryLogCountFromLoki(t, suite.Client, expectedLabels, expectedMatch)
if err != nil {
t.Logf("failed to get log count from loki: %v", err)
return false, nil
}

if len(lokiResponse.Data.Result) == 0 {
return 0, nil
}
delta := count - preCount
if delta == expectedDelta {
return true, nil
}

total := 0
for _, res := range lokiResponse.Data.Result {
total += len(res.Values)
}
t.Logf("get response from loki, query=%s, total=%d", q, total)
return total, nil
}
t.Logf("preCount=%d, count=%d", preCount, count)
return false, nil
}); err != nil {
return false, nil
}

type LokiQueryResponse struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []struct {
Metric interface{}
Values []interface{} `json:"values"`
}
return true, nil
}); err != nil {
t.Errorf("failed to get log count from loki: %v", err)
}
}
Loading

0 comments on commit 5050e36

Please sign in to comment.