Skip to content

Commit

Permalink
Query: Forward tenant information via StoreAPI (thanos-io#6530)
Browse files Browse the repository at this point in the history
* Querier: Forward tenant information downstream

With this commit we attach tenant information to each query request and
forward it via the StoreAPI to any downstream Store Gateways and
Queriers.

We add the following command lines options which mimics the tenant
functionality in Receive. The options are currently hidden, as they
provide no real functionality yet. This will come in future steps.

--query.tenant-header
--query.default-tenant
--query.tenant-certificate

Signed-off-by: Jacob Baungard Hansen <[email protected]>

* Receive: Use CertificateField from Tenancy pkg

These consts are now defined in the Tenancy package, so we should use
those instead.

Signed-off-by: Jacob Baungard Hansen <[email protected]>

---------

Signed-off-by: Jacob Baungard Hansen <[email protected]>
Signed-off-by: rita.canavarro <[email protected]>
  • Loading branch information
jacobbaungard authored and rita.canavarro committed Jul 20, 2023
1 parent a10de73 commit 0fa7e40
Show file tree
Hide file tree
Showing 14 changed files with 430 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ NOTE: Querier's `query.promql-engine` flag enabling new PromQL engine is now unh
- [#5741](https://github.com/thanos-io/thanos/pull/5741) Query: add metrics on how much data is being selected by downstream Store APIs.
- [#5673](https://github.com/thanos-io/thanos/pull/5673) Receive: Reload tenant limit configuration on file change.
- [#5749](https://github.com/thanos-io/thanos/pull/5749) Query Frontend: Added small LRU cache to cache query analysis results.
- [#6500](https://github.com/thanos-io/thanos/pull/6500) Shipper: Add metric `thanos_shipper_uploaded_bytes_total` for number of total uploaded blocks.

### Changed

Expand Down
14 changes: 14 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/ui"
)
Expand Down Expand Up @@ -218,6 +219,10 @@ func registerQuery(app *extkingpin.App) {
queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Float64List()
queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Float64List()

tenantHeader := cmd.Flag("query.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).Hidden().String()
defaultTenant := cmd.Flag("query.default-tenant", "Name of the default tenant.").Default(tenancy.DefaultTenant).Hidden().String()
tenantCertField := cmd.Flag("query.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the query.tenant-header flag value to be ignored.").Default("").Hidden().Enum("", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName)

var storeRateLimits store.SeriesSelectLimits
storeRateLimits.RegisterFlags(cmd)

Expand Down Expand Up @@ -337,6 +342,9 @@ func registerQuery(app *extkingpin.App) {
*defaultEngine,
storeRateLimits,
queryMode(*promqlQueryMode),
*tenantHeader,
*defaultTenant,
*tenantCertField,
)
})
}
Expand Down Expand Up @@ -413,6 +421,9 @@ func runQuery(
defaultEngine string,
storeRateLimits store.SeriesSelectLimits,
queryMode queryMode,
tenantHeader string,
defaultTenant string,
tenantCertField string,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand Down Expand Up @@ -747,6 +758,9 @@ func runQuery(
queryTelemetrySeriesQuantiles,
),
reg,
tenantHeader,
defaultTenant,
tenantCertField,
)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(tenancy.DefaultTenantHeader).StringVar(&rc.tenantHeader)

cmd.Flag("receive.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+receive.CertificateFieldOrganization+", "+receive.CertificateFieldOrganizationalUnit+" or "+receive.CertificateFieldCommonName+". This setting will cause the receive.tenant-header flag value to be ignored.").Default("").EnumVar(&rc.tenantField, "", receive.CertificateFieldOrganization, receive.CertificateFieldOrganizationalUnit, receive.CertificateFieldCommonName)
cmd.Flag("receive.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the receive.tenant-header flag value to be ignored.").Default("").EnumVar(&rc.tenantField, "", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName)

cmd.Flag("receive.default-tenant-id", "Default tenant ID to use when none is provided via a header.").Default(tenancy.DefaultTenant).StringVar(&rc.defaultTenantID)

Expand Down
50 changes: 48 additions & 2 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -161,6 +162,10 @@ type QueryAPI struct {
queryRangeHist prometheus.Histogram

seriesStatsAggregator seriesQueryPerformanceMetricsAggregator

tenantHeader string
defaultTenant string
tenantCertField string
}

type seriesQueryPerformanceMetricsAggregator interface {
Expand Down Expand Up @@ -196,6 +201,9 @@ func NewQueryAPI(
gate gate.Gate,
statsAggregator seriesQueryPerformanceMetricsAggregator,
reg *prometheus.Registry,
tenantHeader string,
defaultTenant string,
tenantCertField string,
) *QueryAPI {
if statsAggregator == nil {
statsAggregator = &store.NoopSeriesStatsAggregator{}
Expand Down Expand Up @@ -226,6 +234,9 @@ func NewQueryAPI(
defaultMetadataTimeRange: defaultMetadataTimeRange,
disableCORS: disableCORS,
seriesStatsAggregator: statsAggregator,
tenantHeader: tenantHeader,
defaultTenant: defaultTenant,
tenantCertField: tenantCertField,

queryRangeHist: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_query_range_requested_timespan_duration_seconds",
Expand Down Expand Up @@ -505,6 +516,13 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
lookbackDelta = lookbackDeltaFromReq
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField)
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()
Expand Down Expand Up @@ -665,6 +683,13 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
lookbackDelta = lookbackDeltaFromReq
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField)
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)

// Record the query range requested.
qapi.queryRangeHist.Observe(end.Sub(start).Seconds())

Expand Down Expand Up @@ -770,6 +795,13 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
matcherSets = append(matcherSets, matchers)
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField)
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)

q, err := qapi.queryableCreate(
true,
nil,
Expand Down Expand Up @@ -866,6 +898,13 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, apiErr, func() {}
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, "")
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx := context.WithValue(r.Context(), tenancy.TenantKey, tenant)

q, err := qapi.queryableCreate(
enableDedup,
replicaLabels,
Expand All @@ -876,7 +915,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
true,
nil,
query.NoopSeriesStatsReporter,
).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))

if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
Expand Down Expand Up @@ -926,6 +965,13 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
matcherSets = append(matcherSets, matchers)
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, "")
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx := context.WithValue(r.Context(), tenancy.TenantKey, tenant)

q, err := qapi.queryableCreate(
true,
nil,
Expand All @@ -936,7 +982,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
true,
nil,
query.NoopSeriesStatsReporter,
).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ func TestQueryEndpoints(t *testing.T) {
Name: "query_range_hist",
}),
seriesStatsAggregator: &store.NoopSeriesStatsAggregator{},
tenantHeader: "thanos-tenant",
defaultTenant: "default-tenant",
}

start := time.Unix(0, 0)
Expand Down Expand Up @@ -744,6 +746,8 @@ func TestMetadataEndpoints(t *testing.T) {
Name: "query_range_hist",
}),
seriesStatsAggregator: &store.NoopSeriesStatsAggregator{},
tenantHeader: "thanos-tenant",
defaultTenant: "default-tenant",
}
apiWithLabelLookback := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Expand All @@ -759,6 +763,8 @@ func TestMetadataEndpoints(t *testing.T) {
Name: "query_range_hist",
}),
seriesStatsAggregator: &store.NoopSeriesStatsAggregator{},
tenantHeader: "thanos-tenant",
defaultTenant: "default-tenant",
}

var tests = []endpointTestCase{
Expand Down Expand Up @@ -1229,6 +1235,8 @@ func TestStoresEndpoint(t *testing.T) {
},
}
},
tenantHeader: "thanos-tenant",
defaultTenant: "default-tenant",
}
apiWithInvalidEndpoint := &QueryAPI{
endpointStatus: func() []query.EndpointStatus {
Expand Down
4 changes: 4 additions & 0 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -340,6 +341,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)
ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey))

// TODO(bwplotka): Use inprocess gRPC when we want to stream responses.
// Currently streaming won't help due to nature of the both PromQL engine which
Expand Down Expand Up @@ -419,6 +421,7 @@ func (q *querier) LabelValues(name string, matchers ...*labels.Matcher) ([]strin

// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)
ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey))

pbMatchers, err := storepb.PromMatchersToMatchers(matchers...)
if err != nil {
Expand Down Expand Up @@ -452,6 +455,7 @@ func (q *querier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.War

// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)
ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey))

pbMatchers, err := storepb.PromMatchersToMatchers(matchers...)
if err != nil {
Expand Down
7 changes: 0 additions & 7 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ const (
labelError = "error"
)

// Allowed fields in client certificates.
const (
CertificateFieldOrganization = "organization"
CertificateFieldOrganizationalUnit = "organizationalUnit"
CertificateFieldCommonName = "commonName"
)

var (
// errConflict is returned whenever an operation fails due to any conflict-type error.
errConflict = errors.New("conflict")
Expand Down
26 changes: 25 additions & 1 deletion pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type metrics struct {
uploads prometheus.Counter
uploadFailures prometheus.Counter
uploadedCompacted prometheus.Gauge
uploadedBytes prometheus.Counter
}

func newMetrics(reg prometheus.Registerer) *metrics {
Expand All @@ -63,6 +64,11 @@ func newMetrics(reg prometheus.Registerer) *metrics {
Name: "thanos_shipper_upload_compacted_done",
Help: "If 1 it means shipper uploaded all compacted blocks from the filesystem.",
})
m.uploadedBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_shipper_uploaded_bytes_total",
Help: "Total number of uploaded bytes.",
})

return &m
}

Expand Down Expand Up @@ -383,7 +389,25 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
if err := meta.WriteToDir(s.logger, updir); err != nil {
return errors.Wrap(err, "write meta file")
}
return block.Upload(ctx, s.logger, s.bucket, updir, s.hashFunc)

err := block.Upload(ctx, s.logger, s.bucket, updir, s.hashFunc)

if err != nil {
return errors.Wrap(err, "while upploading the block")
}

files, err := block.GatherFileStats(updir, s.hashFunc, s.logger)

if err != nil {
//The block upload should not stop due to issues gathering data for a metric
return nil
}

for _, x := range files {
s.metrics.uploadedBytes.Add(float64(x.SizeBytes))
}

return nil
}

// blockMetasFromOldest returns the block meta of each block found in dir
Expand Down
19 changes: 19 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -1229,6 +1230,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
defer s.queryGate.Done()
}

tenant, err := tenancy.GetTenantFromGRPCMetadata(srv.Context())
if err != nil {
level.Warn(s.logger).Log("msg", err)
}
level.Debug(s.logger).Log("msg", "Tenant for Series request", "tenant", tenant)

matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -1478,6 +1485,12 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
}

tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx)
if err != nil {
level.Warn(s.logger).Log("msg", err)
}
level.Debug(s.logger).Log("msg", "Tenant for LabelNames request", "tenant", tenant)

resHints := &hintspb.LabelNamesResponseHints{}

var reqBlockMatchers []*labels.Matcher
Expand Down Expand Up @@ -1666,6 +1679,12 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
}

tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx)
if err != nil {
level.Warn(s.logger).Log("msg", err)
}
level.Debug(s.logger).Log("msg", "Tenant for LabelValues request", "tenant", tenant)

resHints := &hintspb.LabelValuesResponseHints{}

g, gctx := errgroup.WithContext(ctx)
Expand Down
Loading

0 comments on commit 0fa7e40

Please sign in to comment.