Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add single compactor http client for delete and gennumber clients #7453

Merged
merged 1 commit into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [7270](https://github.com/grafana/loki/pull/7270) **wilfriedroset**: Add support for `username` to redis cache configuration.

##### Fixes
* [7453](https://github.com/grafana/loki/pull/7453) **periklis**: Add single compactor http client for delete and gennumber clients
* [7426](https://github.com/grafana/loki/pull/7426) **periklis**: Add missing compactor delete client tls client config
* [7238](https://github.com/grafana/loki/pull/7328) **periklis**: Fix internal server bootstrap for query frontend
* [7288](https://github.com/grafana/loki/pull/7288) **ssncferreira**: Fix query mapping in AST mapper `rangemapper` to support the new `VectorExpr` expression.
Expand Down
4 changes: 2 additions & 2 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type Config struct {
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
DeleteClient deletion.Config `yaml:"delete_client,omitempty"`
CompactorClient compactor.ClientConfig `yaml:"compactor_client,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
Expand Down Expand Up @@ -115,7 +115,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Common.RegisterFlags(f)
c.Distributor.RegisterFlags(f)
c.Querier.RegisterFlags(f)
c.DeleteClient.RegisterFlags(f)
c.CompactorClient.RegisterFlags(f)
c.IngesterClient.RegisterFlags(f)
c.Ingester.RegisterFlags(f)
c.StorageConfig.RegisterFlags(f)
Expand Down
17 changes: 12 additions & 5 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
toMerge := []middleware.Interface{
httpreq.ExtractQueryMetricsMiddleware(),
}
if t.supportIndexDeleteRequest() {
if t.supportIndexDeleteRequest() && t.Cfg.CompactorConfig.RetentionEnabled {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this additional check for, why wasn't it previously needed? You check it in combination with t.supportIndexDeleteRequests in multiple places so I'm curious if it should be moved into the t.supportIndexDeleteRequests function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above in the description this extra check is needed when the deletes-based retention is entirely disabled. This is the case by default IIRC and produces red-herring errors in the querier and ruler logs like this:

level=error ts=2022-10-18T14:13:41.598649348Z caller=delete_requests_client.go:211 msg="error getting delete requests from the store" err="unexpected status code: 404"
ts=2022-10-18T14:13:41.598697295Z caller=spanlogger.go:80 user=application level=error msg="failed loading deletes for user" err="unexpected status code: 404"

I found moving it into the supportIndexDeleteRequests obscures the fact that this method is dedicated to check if the underlying index store supports deletes. Adding RetentionEnabled in there would make the check for retention hidden/implicit with the store capabilities. OTH as proposed in the implementation it is more explicit for the reader to know that we need an index store that supports deletes and explicitly opt-in to use the feature.

toMerge = append(
toMerge,
queryrangebase.CacheGenNumberHeaderSetterMiddleware(t.cacheGenerationLoader),
Expand Down Expand Up @@ -660,7 +660,8 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
t.Cfg.QueryRange,
util_log.Logger,
t.overrides,
t.Cfg.SchemaConfig, t.cacheGenerationLoader,
t.Cfg.SchemaConfig,
t.cacheGenerationLoader, t.Cfg.CompactorConfig.RetentionEnabled,
prometheus.DefaultRegisterer,
)
if err != nil {
Expand All @@ -679,7 +680,13 @@ func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) {
if err != nil {
return nil, err
}
client, err = generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})

httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient)
if err != nil {
return nil, err
}

client, err = generationnumber.NewGenNumberClient(compactorAddress, httpClient)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1112,7 +1119,7 @@ func (t *Loki) initUsageReport() (services.Service, error) {
}

func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overrides) (deletion.DeleteRequestsClient, error) {
if !t.supportIndexDeleteRequest() {
if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, this seems to add to the argument this additional check should be moved into that function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, it is more explicit for the reader that we need an index store that supports deletes and explicitly opt-in to use the feature.

return deletion.NewNoOpDeleteRequestsStore(), nil
}

Expand All @@ -1121,7 +1128,7 @@ func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overri
return nil, err
}

httpClient, err := deletion.NewDeleteHTTPClient(t.Cfg.DeleteClient)
httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Test_seriesLimiter(t *testing.T) {
cfg.CacheResults = false
// split in 7 with 2 in // max.
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func Test_MaxQueryLookBack(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{
maxQueryLookback: 1 * time.Hour,
maxQueryParallelism: 1,
}, config.SchemaConfig{}, nil, nil)
}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/querier/queryrange/queryrangebase/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type resultsCache struct {
merger Merger
cacheGenNumberLoader CacheGenNumberLoader
shouldCache ShouldCacheFn
retentionEnabled bool
metrics *ResultsCacheMetrics
}

Expand All @@ -181,6 +182,7 @@ func NewResultsCacheMiddleware(
extractor Extractor,
cacheGenNumberLoader CacheGenNumberLoader,
shouldCache ShouldCacheFn,
retentionEnabled bool,
metrics *ResultsCacheMetrics,
) (Middleware, error) {
if cacheGenNumberLoader != nil {
Expand All @@ -199,6 +201,7 @@ func NewResultsCacheMiddleware(
splitter: splitter,
cacheGenNumberLoader: cacheGenNumberLoader,
shouldCache: shouldCache,
retentionEnabled: retentionEnabled,
metrics: metrics,
}
}), nil
Expand All @@ -214,7 +217,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
return s.next.Do(ctx, r)
}

if s.cacheGenNumberLoader != nil {
if s.cacheGenNumberLoader != nil && s.retentionEnabled {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again I'm not quite sure why we are having to add this check now but it wasn't needed before? seems out of scope with adding TLS support to the client, so I'm curious what motivated it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it seems out of scope, but on the other hand fixing TLS makes the 404s visible in the querier/ruler logs. I know if you run the entire system withough TLS you see the 404s anyway if retention is disabled. In any case both bits missing TLS support and 404s on disabled retentions are hindsights of the compactor client. IMHO the PR is just removing these hindsights and we may need to just re-title/change the changelog entry to be more informative in that direction. WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing TLS makes the 404s visible in the querier/ruler log

sounds like good enough justification to me! thanks for that added context.

ctx = cache.InjectCacheGenNumber(ctx, s.cacheGenNumberLoader.GetResultsCacheGenNumber(tenantIDs))
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/querier/queryrange/queryrangebase/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ func TestResultsCache(t *testing.T) {
PrometheusResponseExtractor{},
nil,
nil,
false,
nil,
)
require.NoError(t, err)
Expand Down Expand Up @@ -807,6 +808,7 @@ func TestResultsCacheRecent(t *testing.T) {
PrometheusResponseExtractor{},
nil,
nil,
false,
nil,
)
require.NoError(t, err)
Expand Down Expand Up @@ -871,6 +873,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) {
PrometheusResponseExtractor{},
nil,
nil,
false,
nil,
)
require.NoError(t, err)
Expand Down Expand Up @@ -910,6 +913,7 @@ func Test_resultsCache_MissingData(t *testing.T) {
PrometheusResponseExtractor{},
nil,
nil,
false,
nil,
)
require.NoError(t, err)
Expand Down Expand Up @@ -1021,6 +1025,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) {
PrometheusResponseExtractor{},
nil,
tc.shouldCache,
false,
nil,
)
require.NoError(t, err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewTripperware(
limits Limits,
schema config.SchemaConfig,
cacheGenNumLoader queryrangebase.CacheGenNumberLoader,
retentionEnabled bool,
registerer prometheus.Registerer,
) (queryrangebase.Tripperware, Stopper, error) {
metrics := NewMetrics(registerer)
Expand All @@ -65,7 +66,7 @@ func NewTripperware(
}

metricsTripperware, err := NewMetricTripperware(cfg, log, limits, schema, LokiCodec, c,
cacheGenNumLoader, PrometheusExtractor{}, metrics, registerer)
cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, registerer)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -395,6 +396,7 @@ func NewMetricTripperware(
codec queryrangebase.Codec,
c cache.Cache,
cacheGenNumLoader queryrangebase.CacheGenNumberLoader,
retentionEnabled bool,
extractor queryrangebase.Extractor,
metrics *Metrics,
registerer prometheus.Registerer,
Expand Down Expand Up @@ -427,6 +429,7 @@ func NewMetricTripperware(
func(r queryrangebase.Request) bool {
return !r.GetCachingOptions().Disabled
},
retentionEnabled,
metrics.ResultsCacheMetrics,
)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ var (
// those tests are mostly for testing the glue between all component and make sure they activate correctly.
func TestMetricsTripperware(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestMetricsTripperware(t *testing.T) {
}

func TestLogFilterTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestLogFilterTripperware(t *testing.T) {
func TestInstantQueryTripperware(t *testing.T) {
testShardingConfig := testConfig
testShardingConfig.ShardedQueries = true
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestInstantQueryTripperware(t *testing.T) {
}

func TestSeriesTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestSeriesTripperware(t *testing.T) {
}

func TestLabelsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -345,7 +345,7 @@ func TestLabelsTripperware(t *testing.T) {
}

func TestLogNoFilter(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestLogNoFilter(t *testing.T) {

func TestRegexpParamsSupport(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -464,7 +464,7 @@ func TestPostQueries(t *testing.T) {
}

func TestEntriesLimitsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/storage/stores/indexshipper/compactor/compactor_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package compactor

import (
"flag"
"net/http"
"time"

"github.com/grafana/dskit/crypto/tls"
)

// Config for compactor's generation-number client
type ClientConfig struct {
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) {
prefix := "boltdb.shipper.compactor.client"
f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false,
periklis marked this conversation as resolved.
Show resolved Hide resolved
"Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.")
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
}

// NewDeleteHTTPClient return a pointer to a http client instance based on the
// delete client tls settings.
func NewCompactorHTTPClient(cfg ClientConfig) (*http.Client, error) {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConns = 250
transport.MaxIdleConnsPerHost = 250

if cfg.TLSEnabled {
tlsCfg, err := cfg.TLS.GetTLSConfig()
if err != nil {
return nil, err
}

transport.TLSClientConfig = tlsCfg
}

return &http.Client{Timeout: 5 * time.Second, Transport: transport}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package deletion
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
Expand All @@ -14,8 +13,6 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/dskit/crypto/tls"

"github.com/grafana/loki/pkg/util/log"
)

Expand All @@ -24,20 +21,6 @@ const (
getDeletePath = "/loki/api/v1/delete"
)

// Config for compactor's delete client
type Config struct {
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
prefix := "boltdb.shipper.compactor.delete_client"
f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false,
"Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.")
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
}

type DeleteRequestsClient interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error)
Stop()
Expand Down Expand Up @@ -69,25 +52,6 @@ func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption {
}
}

// NewDeleteHTTPClient return a pointer to a http client instance based on the
// delete client tls settings.
func NewDeleteHTTPClient(cfg Config) (*http.Client, error) {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConns = 250
transport.MaxIdleConnsPerHost = 250

if cfg.TLSEnabled {
tlsCfg, err := cfg.TLS.GetTLSConfig()
if err != nil {
return nil, err
}

transport.TLSClientConfig = tlsCfg
}

return &http.Client{Timeout: 5 * time.Second, Transport: transport}, nil
}

func NewDeleteRequestsClient(addr string, c httpClient, deleteClientMetrics *DeleteRequestClientMetrics, clientType string, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) {
u, err := url.Parse(addr)
if err != nil {
Expand Down