diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d97369bbd2a..ed80269bf7d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ * [7720](https://github.com/grafana/loki/pull/7720) **sandeepsukhani**: fix bugs in processing delete requests with line filters. +* [7708](https://github.com/grafana/loki/pull/7708) **DylanGuedes**: Fix multitenant querying. + ##### Changes #### Promtail diff --git a/integration/cluster/cluster.go b/integration/cluster/cluster.go index eb480401b3e0..1b37725f8efa 100644 --- a/integration/cluster/cluster.go +++ b/integration/cluster/cluster.go @@ -30,6 +30,9 @@ auth_enabled: true server: http_listen_port: 0 grpc_listen_port: 0 + grpc_server_max_recv_msg_size: 110485813 + grpc_server_max_send_msg_size: 110485813 + common: path_prefix: {{.dataPath}} @@ -43,6 +46,12 @@ common: kvstore: store: inmemory +limits_config: + per_stream_rate_limit: 50MB + per_stream_rate_limit_burst: 50MB + ingestion_rate_mb: 50 + ingestion_burst_size_mb: 50 + storage_config: boltdb_shipper: shared_store: filesystem @@ -71,6 +80,9 @@ ingester: lifecycler: min_ready_duration: 0s +querier: + multi_tenant_queries_enabled: true + {{if .remoteWriteUrls}} ruler: wal: diff --git a/integration/multi_tenant_queries_test.go b/integration/multi_tenant_queries_test.go new file mode 100644 index 000000000000..9677c1e8d943 --- /dev/null +++ b/integration/multi_tenant_queries_test.go @@ -0,0 +1,66 @@ +package integration + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/integration/client" + "github.com/grafana/loki/integration/cluster" +) + +func TestMultiTenantQuery(t *testing.T) { + clu := cluster.New() + defer func() { + assert.NoError(t, clu.Cleanup()) + }() + + var ( + tAll = clu.AddComponent( + "all", + "-target=all", + ) + ) + + require.NoError(t, clu.Run()) + + cliTenant1 := client.New("org1", "", tAll.HTTPURL()) + cliTenant2 := client.New("org2", "", tAll.HTTPURL()) + cliMultitenant := client.New("org1|org2", "", tAll.HTTPURL()) + + // ingest log lines for tenant 1 and tenant 2. + require.NoError(t, cliTenant1.PushLogLineWithTimestamp("lineA", cliTenant1.Now.Add(-45*time.Minute), map[string]string{"job": "fake1"})) + require.NoError(t, cliTenant2.PushLogLineWithTimestamp("lineB", cliTenant2.Now.Add(-45*time.Minute), map[string]string{"job": "fake2"})) + + // check that tenant1 only have access to log line A. + matchLines(t, cliTenant1, `{job="fake2"}`, []string{}) + matchLines(t, cliTenant1, `{job=~"fake.*"}`, []string{"lineA"}) + matchLines(t, cliTenant1, `{job="fake1"}`, []string{"lineA"}) + + // check that tenant2 only have access to log line B. + matchLines(t, cliTenant2, `{job="fake1"}`, []string{}) + matchLines(t, cliTenant2, `{job=~"fake.*"}`, []string{"lineB"}) + matchLines(t, cliTenant2, `{job="fake2"}`, []string{"lineB"}) + + // check that multitenant has access to all log lines on same query. + matchLines(t, cliMultitenant, `{job=~"fake.*"}`, []string{"lineA", "lineB"}) + matchLines(t, cliMultitenant, `{job="fake1"}`, []string{"lineA"}) + matchLines(t, cliMultitenant, `{job="fake2"}`, []string{"lineB"}) + matchLines(t, cliMultitenant, `{job="fake3"}`, []string{}) +} + +func matchLines(t *testing.T, client *client.Client, labels string, expectedLines []string) { + resp, err := client.RunRangeQuery(context.Background(), labels) + require.NoError(t, err) + + var lines []string + for _, stream := range resp.Data.Stream { + for _, val := range stream.Values { + lines = append(lines, val[1]) + } + } + require.ElementsMatch(t, expectedLines, lines) +} diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 905ce43cf9b0..c44d2ab1a8c8 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -240,8 +240,8 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { } func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) { - userID, _ := tenant.TenantID(ctx) - queryTimeout := q.limits.QueryTimeout(userID) + tenants, _ := tenant.TenantIDs(ctx) + queryTimeout := validation.SmallestPositiveNonZeroDurationPerTenant(tenants, q.limits.QueryTimeout) ctx, cancel := context.WithTimeout(ctx, queryTimeout) defer cancel() diff --git a/pkg/querier/http.go b/pkg/querier/http.go index bf0cab2f7ce8..75fadff135b1 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "strconv" + "strings" "time" "github.com/go-kit/log" @@ -487,22 +488,21 @@ func WrapQuerySpanAndTimeout(call string, q *QuerierAPI) middleware.Interface { return middleware.Func(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { log, ctx := spanlogger.New(req.Context(), call) - userID, err := tenant.TenantID(ctx) + tenants, err := tenant.TenantIDs(ctx) if err != nil { level.Error(log).Log("msg", "couldn't fetch tenantID", "err", err) serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w) return } - // Enforce the query timeout while querying backends - queryTimeout := q.limits.QueryTimeout(userID) + timeout := util_validation.SmallestPositiveNonZeroDurationPerTenant(tenants, q.limits.QueryTimeout) // TODO: remove this clause once we remove the deprecated query-timeout flag. - if q.cfg.QueryTimeout != 0 { // querier YAML configuration. - level.Warn(log).Log("msg", "deprecated querier:query_timeout YAML configuration identified. Please migrate to limits:query_timeout instead.", "call", "WrapQuerySpanAndTimeout") - queryTimeout = q.cfg.QueryTimeout + if q.cfg.QueryTimeout != 0 { // querier YAML configuration is still configured. + level.Warn(log).Log("msg", "deprecated querier:query_timeout YAML configuration identified. Please migrate to limits:query_timeout instead.", "call", "WrapQuerySpanAndTimeout", "org_id", strings.Join(tenants, ",")) + timeout = q.cfg.QueryTimeout } - newCtx, cancel := context.WithTimeout(ctx, queryTimeout) + newCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() newReq := req.WithContext(newCtx) diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index 0c8a51e4a827..537012e5a009 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -99,17 +99,18 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que return ast.next.Do(ctx, r) } - userID, err := tenant.TenantID(ctx) + tenants, err := tenant.TenantIDs(ctx) if err != nil { return nil, err } + queryParallelism := validation.SmallestPositiveIntPerTenant(tenants, ast.limits.MaxQueryParallelism) resolver, ok := shardResolverForConf( ctx, conf, ast.ng.Opts().MaxLookBackPeriod, ast.logger, - ast.limits.MaxQueryParallelism(userID), + queryParallelism, r, ast.next, )