Skip to content

Commit

Permalink
*: get rid of store info api
Browse files Browse the repository at this point in the history
We support the Info gRPC api for 3 years now. We used to use Store API
Info as fallback if we encounter an endpoint that does not implement
Info gRPC but that should not happen now anymore.

Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Sep 6, 2024
1 parent 0966192 commit 097ba70
Show file tree
Hide file tree
Showing 17 changed files with 122 additions and 1,068 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#7494](https://github.com/thanos-io/thanos/pull/7494) Ruler: remove trailing period from SRV records returned by discovery `dnsnosrva` lookups
- [#7567](https://github.com/thanos-io/thanos/pull/7565) Query: Use thanos resolver for endpoint groups.
- [#7704](https://github.com/thanos-io/thanos/pull/7704) *: *breaking :warning:* remove Store gRPC Info function. This has been deprecated for 3 years, its time to remove it.

### Removed

Expand Down
3 changes: 1 addition & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ func registerQuery(app *extkingpin.App) {
*webDisableCORS,
*alertQueryURL,
*grpcProxyStrategy,
component.Query,
*queryTelemetryDurationQuantiles,
*queryTelemetrySamplesQuantiles,
*queryTelemetrySeriesQuantiles,
Expand Down Expand Up @@ -437,7 +436,6 @@ func runQuery(
disableCORS bool,
alertQueryURL string,
grpcProxyStrategy string,
comp component.Component,
queryTelemetryDurationQuantiles []float64,
queryTelemetrySamplesQuantiles []float64,
queryTelemetrySeriesQuantiles []float64,
Expand All @@ -452,6 +450,7 @@ func runQuery(
enforceTenancy bool,
tenantLabel string,
) error {
comp := component.Query
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
if lastColon != -1 {
Expand Down
56 changes: 20 additions & 36 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@

package component

import (
"strings"

"github.com/thanos-io/thanos/pkg/store/storepb"
)

// Component is a generic component interface.
type Component interface {
String() string
Expand All @@ -18,7 +12,6 @@ type Component interface {
type StoreAPI interface {
implementsStoreAPI()
String() string
ToProto() storepb.StoreType
}

// Source is a Thanos component that produce blocks of metrics.
Expand All @@ -33,7 +26,6 @@ type SourceStoreAPI interface {
implementsStoreAPI()
producesBlocks()
String() string
ToProto() storepb.StoreType
}

type component struct {
Expand All @@ -48,14 +40,6 @@ type storeAPI struct {

func (storeAPI) implementsStoreAPI() {}

func (s sourceStoreAPI) ToProto() storepb.StoreType {
return storepb.StoreType(storepb.StoreType_value[strings.ToUpper(s.String())])
}

func (s storeAPI) ToProto() storepb.StoreType {
return storepb.StoreType(storepb.StoreType_value[strings.ToUpper(s.String())])
}

type source struct {
component
}
Expand All @@ -68,26 +52,6 @@ type sourceStoreAPI struct {
storeAPI
}

// FromProto converts from a gRPC StoreType to StoreAPI.
func FromProto(storeType storepb.StoreType) StoreAPI {
switch storeType {
case storepb.StoreType_QUERY:
return Query
case storepb.StoreType_RULE:
return Rule
case storepb.StoreType_SIDECAR:
return Sidecar
case storepb.StoreType_STORE:
return Store
case storepb.StoreType_RECEIVE:
return Receive
case storepb.StoreType_DEBUG:
return Debug
default:
return UnknownStoreAPI
}
}

func FromString(storeType string) StoreAPI {
switch storeType {
case "query":
Expand Down Expand Up @@ -125,4 +89,24 @@ var (
Store = storeAPI{component: component{name: "store"}}
UnknownStoreAPI = storeAPI{component: component{name: "unknown-store-api"}}
Query = storeAPI{component: component{name: "query"}}

All = []Component{
Bucket,
Cleanup,
Mark,
Upload,
Rewrite,
Retention,
Compact,
Downsample,
Replicate,
QueryFrontend,
Debug,
Receive,
Rule,
Sidecar,
Store,
UnknownStoreAPI,
Query,
}
)
109 changes: 12 additions & 97 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,91 +83,9 @@ func (es *endpointRef) Metadata(ctx context.Context, infoClient infopb.InfoClien
return &endpointMetadata{resp}, nil
}
}

// Call Info method of StoreAPI, this way querier will be able to discovery old components not exposing InfoAPI.
if storeClient != nil {
metadata, err := es.getMetadataUsingStoreAPI(ctx, storeClient)
if err != nil {
return nil, errors.Wrapf(err, "fallback fetching info from %s", es.addr)
}
return metadata, nil
}

return nil, errors.New(noMetadataEndpointMessage)
}

func (es *endpointRef) getMetadataUsingStoreAPI(ctx context.Context, client storepb.StoreClient) (*endpointMetadata, error) {
resp, err := client.Info(ctx, &storepb.InfoRequest{})
if err != nil {
return nil, err
}

infoResp := fillExpectedAPIs(component.FromProto(resp.StoreType), resp.MinTime, resp.MaxTime)
infoResp.LabelSets = resp.LabelSets
infoResp.ComponentType = component.FromProto(resp.StoreType).String()

return &endpointMetadata{
&infoResp,
}, nil
}

func fillExpectedAPIs(componentType component.Component, mintime, maxTime int64) infopb.InfoResponse {
switch componentType {
case component.Sidecar:
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
Rules: &infopb.RulesInfo{},
Targets: &infopb.TargetsInfo{},
MetricMetadata: &infopb.MetricMetadataInfo{},
Exemplars: &infopb.ExemplarsInfo{},
}
case component.Query:
{
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
Rules: &infopb.RulesInfo{},
Targets: &infopb.TargetsInfo{},
MetricMetadata: &infopb.MetricMetadataInfo{},
Exemplars: &infopb.ExemplarsInfo{},
Query: &infopb.QueryAPIInfo{},
}
}
case component.Receive:
{
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
Exemplars: &infopb.ExemplarsInfo{},
}
}
case component.Store:
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
}
case component.Rule:
return infopb.InfoResponse{
Store: &infopb.StoreInfo{
MinTime: mintime,
MaxTime: maxTime,
},
Rules: &infopb.RulesInfo{},
}
default:
return infopb.InfoResponse{}
}
}

// stringError forces the error to be a string
// when marshaled into a JSON.
type stringError struct {
Expand Down Expand Up @@ -199,7 +117,7 @@ type EndpointStatus struct {
// TODO(hitanshu-mehta) Currently,only collecting metrics of storeEndpoints. Make this struct generic.
type endpointSetNodeCollector struct {
mtx sync.Mutex
storeNodes map[component.Component]map[string]int
storeNodes map[string]map[string]int
storePerExtLset map[string]int

logger log.Logger
Expand All @@ -213,7 +131,7 @@ func newEndpointSetNodeCollector(logger log.Logger, labels ...string) *endpointS
}
return &endpointSetNodeCollector{
logger: logger,
storeNodes: map[component.Component]map[string]int{},
storeNodes: map[string]map[string]int{},
connectionsDesc: prometheus.NewDesc(
"thanos_store_nodes_grpc_connections",
"Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.",
Expand All @@ -236,8 +154,8 @@ func truncateExtLabels(s string, threshold int) string {
}
return s
}
func (c *endpointSetNodeCollector) Update(nodes map[component.Component]map[string]int) {
storeNodes := make(map[component.Component]map[string]int, len(nodes))
func (c *endpointSetNodeCollector) Update(nodes map[string]map[string]int) {
storeNodes := make(map[string]map[string]int, len(nodes))
storePerExtLset := map[string]int{}

for storeType, occurrencesPerExtLset := range nodes {
Expand All @@ -263,12 +181,9 @@ func (c *endpointSetNodeCollector) Collect(ch chan<- prometheus.Metric) {
c.mtx.Lock()
defer c.mtx.Unlock()

for storeType, occurrencesPerExtLset := range c.storeNodes {
for _, occurrencesPerExtLset := range c.storeNodes {
for externalLabels, occurrences := range occurrencesPerExtLset {
var storeTypeStr string
if storeType != nil {
storeTypeStr = storeType.String()
}
// Select only required labels.
lbls := []string{}
for _, lbl := range c.labels {
Expand Down Expand Up @@ -454,12 +369,12 @@ func (e *EndpointSet) Update(ctx context.Context) {

// All producers that expose StoreAPI should have unique external labels. Check all which connect to our Querier.
if er.HasStoreAPI() && (er.ComponentType() == component.Sidecar || er.ComponentType() == component.Rule) &&
stats[component.Sidecar][extLset]+stats[component.Rule][extLset] > 0 {
stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset] > 0 {

level.Warn(e.logger).Log("msg", "found duplicate storeEndpoints producer (sidecar or ruler). This is not advices as it will malform data in in the same bucket",
"address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar][extLset]+stats[component.Rule][extLset]+1))
"address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset]+1))
}
stats[er.ComponentType()][extLset]++
stats[er.ComponentType().String()][extLset]++
}

e.endpointsMetric.Update(stats)
Expand Down Expand Up @@ -905,10 +820,10 @@ type endpointMetadata struct {
*infopb.InfoResponse
}

func newEndpointAPIStats() map[component.Component]map[string]int {
nodes := make(map[component.Component]map[string]int, len(storepb.StoreType_name))
for i := range storepb.StoreType_name {
nodes[component.FromProto(storepb.StoreType(i))] = map[string]int{}
func newEndpointAPIStats() map[string]map[string]int {
nodes := make(map[string]map[string]int, len(component.All))
for _, comp := range component.All {
nodes[comp.String()] = map[string]int{}
}
return nodes
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ func (m *seriesClientMapper) SendMsg(_ interface{}) error {
return nil
}

func (l *localClient) Info(ctx context.Context, in *storepb.InfoRequest, opts ...grpc.CallOption) (*storepb.InfoResponse, error) {
return l.store.Info(ctx, in)
}

func (l *localClient) Series(ctx context.Context, in *storepb.SeriesRequest, opts ...grpc.CallOption) (storepb.Store_SeriesClient, error) {
return &seriesClientMapper{ctx: ctx, store: l.store, req: *in}, nil
}
Expand Down
14 changes: 0 additions & 14 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/info/infopb"
Expand Down Expand Up @@ -950,19 +949,6 @@ func (s *BucketStore) LabelSet() []labelpb.LabelSet {
return labelSets
}

// Info implements the storepb.StoreServer interface.
func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) {
mint, maxt := s.TimeRange()
res := &storepb.InfoResponse{
StoreType: component.Store.ToProto(),
MinTime: mint,
MaxTime: maxt,
LabelSets: s.LabelSet(),
}

return res, nil
}

func (s *BucketStore) limitMinTime(mint int64) int64 {
if s.filterConfig == nil {
return mint
Expand Down
49 changes: 0 additions & 49 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,47 +735,6 @@ func TestBucketStore_TSDBInfo(t *testing.T) {
})
}

func TestBucketStore_Info(t *testing.T) {
defer custom.TolerantVerifyLeak(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dir := t.TempDir()

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(
nil,
nil,
dir,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
NewBytesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
0,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bucketStore.Close()) }()

resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
testutil.Ok(t, err)

testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType)
testutil.Equals(t, int64(math.MaxInt64), resp.MinTime)
testutil.Equals(t, int64(math.MinInt64), resp.MaxTime)
testutil.Equals(t, []labelpb.LabelSet(nil), resp.LabelSets)
testutil.Equals(t, []labelpb.Label(nil), resp.Labels)
}

type recorder struct {
mtx sync.Mutex
objstore.Bucket
Expand Down Expand Up @@ -1022,14 +981,6 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
})
testutil.Equals(t, sc.expectedIDs, ids)

// Check Info endpoint.
resp, err := bucketStore.Info(context.Background(), &storepb.InfoRequest{})
testutil.Ok(t, err)

testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType)
testutil.Equals(t, []labelpb.Label(nil), resp.Labels)
testutil.Equals(t, sc.expectedAdvLabels, resp.LabelSets)

// Make sure we don't download files we did not expect to.
// Regression test: https://github.com/thanos-io/thanos/issues/1664

Expand Down
Loading

0 comments on commit 097ba70

Please sign in to comment.