Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: Fix multitenant querying #7708

Merged
merged 9 commits into from
Nov 24, 2022
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

* [7720](https://github.com/grafana/loki/pull/7720) **sandeepsukhani**: fix bugs in processing delete requests with line filters.

* [7708](https://github.com/grafana/loki/pull/7708) **DylanGuedes**: Fix multitenant querying.

##### Changes

#### Promtail
Expand Down
12 changes: 12 additions & 0 deletions integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ auth_enabled: true
server:
http_listen_port: 0
grpc_listen_port: 0
grpc_server_max_recv_msg_size: 110485813
grpc_server_max_send_msg_size: 110485813


common:
path_prefix: {{.dataPath}}
Expand All @@ -43,6 +46,12 @@ common:
kvstore:
store: inmemory

limits_config:
per_stream_rate_limit: 50MB
per_stream_rate_limit_burst: 50MB
ingestion_rate_mb: 50
ingestion_burst_size_mb: 50

storage_config:
boltdb_shipper:
shared_store: filesystem
Expand Down Expand Up @@ -71,6 +80,9 @@ ingester:
lifecycler:
min_ready_duration: 0s

querier:
multi_tenant_queries_enabled: true

{{if .remoteWriteUrls}}
ruler:
wal:
Expand Down
66 changes: 66 additions & 0 deletions integration/multi_tenant_queries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package integration

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"
)

func TestMultiTenantQuery(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test!

clu := cluster.New()
defer func() {
assert.NoError(t, clu.Cleanup())
}()

var (
tAll = clu.AddComponent(
"all",
"-target=all",
)
)

require.NoError(t, clu.Run())

cliTenant1 := client.New("org1", "", tAll.HTTPURL())
cliTenant2 := client.New("org2", "", tAll.HTTPURL())
cliMultitenant := client.New("org1|org2", "", tAll.HTTPURL())

// ingest log lines for tenant 1 and tenant 2.
require.NoError(t, cliTenant1.PushLogLineWithTimestamp("lineA", cliTenant1.Now.Add(-45*time.Minute), map[string]string{"job": "fake1"}))
require.NoError(t, cliTenant2.PushLogLineWithTimestamp("lineB", cliTenant2.Now.Add(-45*time.Minute), map[string]string{"job": "fake2"}))

// check that tenant1 only have access to log line A.
matchLines(t, cliTenant1, `{job="fake2"}`, []string{})
matchLines(t, cliTenant1, `{job=~"fake.*"}`, []string{"lineA"})
matchLines(t, cliTenant1, `{job="fake1"}`, []string{"lineA"})

// check that tenant2 only have access to log line B.
matchLines(t, cliTenant2, `{job="fake1"}`, []string{})
matchLines(t, cliTenant2, `{job=~"fake.*"}`, []string{"lineB"})
matchLines(t, cliTenant2, `{job="fake2"}`, []string{"lineB"})

// check that multitenant has access to all log lines on same query.
matchLines(t, cliMultitenant, `{job=~"fake.*"}`, []string{"lineA", "lineB"})
matchLines(t, cliMultitenant, `{job="fake1"}`, []string{"lineA"})
matchLines(t, cliMultitenant, `{job="fake2"}`, []string{"lineB"})
matchLines(t, cliMultitenant, `{job="fake3"}`, []string{})
}

func matchLines(t *testing.T, client *client.Client, labels string, expectedLines []string) {
resp, err := client.RunRangeQuery(context.Background(), labels)
require.NoError(t, err)

var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
require.ElementsMatch(t, expectedLines, lines)
}
4 changes: 2 additions & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
}

func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
userID, _ := tenant.TenantID(ctx)
queryTimeout := q.limits.QueryTimeout(userID)
tenants, _ := tenant.TenantIDs(ctx)
queryTimeout := validation.SmallestPositiveNonZeroDurationPerTenant(tenants, q.limits.QueryTimeout)

ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()
Expand Down
14 changes: 7 additions & 7 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -487,22 +488,21 @@ func WrapQuerySpanAndTimeout(call string, q *QuerierAPI) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
log, ctx := spanlogger.New(req.Context(), call)
userID, err := tenant.TenantID(ctx)
tenants, err := tenant.TenantIDs(ctx)
if err != nil {
level.Error(log).Log("msg", "couldn't fetch tenantID", "err", err)
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

// Enforce the query timeout while querying backends
queryTimeout := q.limits.QueryTimeout(userID)
timeout := util_validation.SmallestPositiveNonZeroDurationPerTenant(tenants, q.limits.QueryTimeout)
// TODO: remove this clause once we remove the deprecated query-timeout flag.
if q.cfg.QueryTimeout != 0 { // querier YAML configuration.
level.Warn(log).Log("msg", "deprecated querier:query_timeout YAML configuration identified. Please migrate to limits:query_timeout instead.", "call", "WrapQuerySpanAndTimeout")
queryTimeout = q.cfg.QueryTimeout
if q.cfg.QueryTimeout != 0 { // querier YAML configuration is still configured.
level.Warn(log).Log("msg", "deprecated querier:query_timeout YAML configuration identified. Please migrate to limits:query_timeout instead.", "call", "WrapQuerySpanAndTimeout", "org_id", strings.Join(tenants, ","))
timeout = q.cfg.QueryTimeout
}

newCtx, cancel := context.WithTimeout(ctx, queryTimeout)
newCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

newReq := req.WithContext(newCtx)
Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,18 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que
return ast.next.Do(ctx, r)
}

userID, err := tenant.TenantID(ctx)
tenants, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
queryParallelism := validation.SmallestPositiveIntPerTenant(tenants, ast.limits.MaxQueryParallelism)

resolver, ok := shardResolverForConf(
ctx,
conf,
ast.ng.Opts().MaxLookBackPeriod,
ast.logger,
ast.limits.MaxQueryParallelism(userID),
queryParallelism,
r,
ast.next,
)
Expand Down