diff --git a/CHANGELOG.md b/CHANGELOG.md index ea029e2170..4e447d5b69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. - [#5470](https://github.com/thanos-io/thanos/pull/5470) Receive: Implement exposing TSDB stats for all tenants - [#5493](https://github.com/thanos-io/thanos/pull/5493) Compact: Added `--compact.blocks-fetch-concurrency` allowing to configure number of go routines for download blocks during compactions. +- [#5480](https://github.com/thanos-io/thanos/pull/5480) Query: Expose endpoint info timeout as a hidden flag. - [#5527](https://github.com/thanos-io/thanos/pull/5527) Receive: Add per request limits for remote write. - [#5520](https://github.com/thanos-io/thanos/pull/5520) Receive: Meta-monitoring based active series limiting. - [#5555](https://github.com/thanos-io/thanos/pull/5555) Query: Added `--query.active-query-path` flag, allowing the user to configure the directory to create an active query tracking file, `queries.active`, for different resolution. diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 53d2080626..ad0bad9e87 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -147,6 +147,8 @@ func registerQuery(app *extkingpin.App) { unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m")) + endpointInfoTimeout := extkingpin.ModelDuration(cmd.Flag("endpoint.info-timeout", "Timeout of gRPC Info requests.").Default("5s").Hidden()) + enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified."). Default("false").Bool() @@ -279,6 +281,7 @@ func registerQuery(app *extkingpin.App) { time.Duration(*dnsSDInterval), *dnsSDResolver, time.Duration(*unhealthyStoreTimeout), + time.Duration(*endpointInfoTimeout), time.Duration(*instantDefaultMaxSourceResolution), *defaultMetadataTimeRange, *strictStores, @@ -347,6 +350,7 @@ func runQuery( dnsSDInterval time.Duration, dnsSDResolver string, unhealthyStoreTimeout time.Duration, + endpointInfoTimeout time.Duration, instantDefaultMaxSourceResolution time.Duration, defaultMetadataTimeRange time.Duration, strictStores []string, @@ -459,6 +463,7 @@ func runQuery( }, dialOpts, unhealthyStoreTimeout, + endpointInfoTimeout, ) proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout) rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients) diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 1b91adde80..c84d942938 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -248,7 +248,7 @@ type EndpointSet struct { // accessible and we close gRPC client for it, unless it is strict. endpointSpec func() map[string]*GRPCEndpointSpec dialOpts []grpc.DialOption - gRPCInfoCallTimeout time.Duration + endpointInfoTimeout time.Duration unhealthyEndpointTimeout time.Duration updateMtx sync.Mutex @@ -272,6 +272,7 @@ func NewEndpointSet( endpointSpecs func() []*GRPCEndpointSpec, dialOpts []grpc.DialOption, unhealthyEndpointTimeout time.Duration, + endpointInfoTimeout time.Duration, ) *EndpointSet { endpointsMetric := newEndpointSetNodeCollector() if reg != nil { @@ -292,9 +293,8 @@ func NewEndpointSet( endpointsMetric: endpointsMetric, dialOpts: dialOpts, - gRPCInfoCallTimeout: 5 * time.Second, + endpointInfoTimeout: endpointInfoTimeout, unhealthyEndpointTimeout: unhealthyEndpointTimeout, - endpointSpec: func() map[string]*GRPCEndpointSpec { specs := make(map[string]*GRPCEndpointSpec) for _, s := range endpointSpecs() { @@ -327,7 +327,7 @@ func (e *EndpointSet) Update(ctx context.Context) { wg.Add(1) go func(spec *GRPCEndpointSpec) { defer wg.Done() - ctx, cancel := context.WithTimeout(ctx, e.gRPCInfoCallTimeout) + ctx, cancel := context.WithTimeout(ctx, e.endpointInfoTimeout) defer cancel() e.updateEndpoint(ctx, spec, er) @@ -342,7 +342,7 @@ func (e *EndpointSet) Update(ctx context.Context) { wg.Add(1) go func(spec *GRPCEndpointSpec) { defer wg.Done() - ctx, cancel := context.WithTimeout(ctx, e.gRPCInfoCallTimeout) + ctx, cancel := context.WithTimeout(ctx, e.endpointInfoTimeout) defer cancel() newRef, err := e.newEndpointRef(ctx, spec) diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index 05d5d366f5..1274648b93 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -373,7 +373,6 @@ func TestEndpointSetUpdate_DuplicateSpecs(t *testing.T) { discoveredEndpointAddr = append(discoveredEndpointAddr, discoveredEndpointAddr[0]) endpointSet := makeEndpointSet(discoveredEndpointAddr, false, time.Now) - endpointSet.gRPCInfoCallTimeout = 1 * time.Second defer endpointSet.Close() endpointSet.Update(context.Background()) @@ -396,7 +395,6 @@ func TestEndpointSetUpdate_EndpointGoingAway(t *testing.T) { discoveredEndpointAddr := endpoints.EndpointAddresses() endpointSet := makeEndpointSet(discoveredEndpointAddr, false, time.Now) - endpointSet.gRPCInfoCallTimeout = 1 * time.Second defer endpointSet.Close() // Initial update. @@ -559,7 +557,7 @@ func TestEndpointSetUpdate_AtomicEndpointAdditions(t *testing.T) { updateTime := time.Now() discoveredEndpointAddr := endpoints.EndpointAddresses() endpointSet := makeEndpointSet(discoveredEndpointAddr, false, func() time.Time { return updateTime }) - endpointSet.gRPCInfoCallTimeout = 3 * time.Second + endpointSet.endpointInfoTimeout = 3 * time.Second defer endpointSet.Close() var wg sync.WaitGroup @@ -648,8 +646,7 @@ func TestEndpointSet_Update(t *testing.T) { } return specs }, - testGRPCOpts, time.Minute) - endpointSet.gRPCInfoCallTimeout = 2 * time.Second + testGRPCOpts, time.Minute, 2*time.Second) defer endpointSet.Close() // Initial update. @@ -1030,8 +1027,7 @@ func TestEndpointSet_Update_NoneAvailable(t *testing.T) { } return specs }, - testGRPCOpts, time.Minute) - endpointSet.gRPCInfoCallTimeout = 2 * time.Second + testGRPCOpts, time.Minute, 2*time.Second) defer endpointSet.Close() // Should not matter how many of these we run. @@ -1140,9 +1136,8 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) { NewGRPCEndpointSpec(discoveredEndpointAddr[1], false), NewGRPCEndpointSpec(discoveredEndpointAddr[2], true), } - }, testGRPCOpts, time.Minute) + }, testGRPCOpts, time.Minute, 1*time.Second) defer endpointSet.Close() - endpointSet.gRPCInfoCallTimeout = 1 * time.Second // Initial update. endpointSet.Update(context.Background()) @@ -1161,12 +1156,12 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) { testutil.Equals(t, int64(54321), curMax, "got incorrect minimum time") // Successfully retrieve the information and observe minTime/maxTime updating. - endpointSet.gRPCInfoCallTimeout = 3 * time.Second + endpointSet.endpointInfoTimeout = 3 * time.Second endpointSet.Update(context.Background()) updatedCurMin, updatedCurMax := endpointSet.endpoints[slowStaticEndpointAddr].metadata.Store.MinTime, endpointSet.endpoints[slowStaticEndpointAddr].metadata.Store.MaxTime testutil.Equals(t, int64(65644), updatedCurMin) testutil.Equals(t, int64(77777), updatedCurMax) - endpointSet.gRPCInfoCallTimeout = 1 * time.Second + endpointSet.endpointInfoTimeout = 1 * time.Second // Turn off the endpoints. endpoints.Close() @@ -1320,7 +1315,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { return tc.states[currentState].endpointSpec() }, - testGRPCOpts, time.Minute) + testGRPCOpts, time.Minute, 2*time.Second) defer endpointSet.Close() @@ -1506,9 +1501,7 @@ func makeEndpointSet(discoveredEndpointAddr []string, strict bool, now nowFunc) } return specs }, - testGRPCOpts, time.Minute) - endpointSet.gRPCInfoCallTimeout = 1 * time.Second - + testGRPCOpts, time.Minute, time.Second) return endpointSet }