From 8d24d8adc81933b63ccc4a2d0f49c42a0f1bea92 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 14 Oct 2019 11:06:19 +0100 Subject: [PATCH] Removed Querier duplicate labels checks & made sure store will be not blocked for older Queriers. (#1636) Changes: * Cleaned up the code a bit. * Ensured proper locks. * thanos_store_nodes_grpc_connections is not per store type and external labels. thanos_store_node_info is marked as deprecated. * Added (optional, but enabled by default) compatibility label to store GW Info method with flag to disable it. Store only adds it if there is any labelset to advertise. * Disabled any strict deduplication in external labels detection; just warning! * More tests; compatibility tests for query pre 0.8.0 and new store GW. * Remove compatibility label if spotted. Fixes https://github.com/thanos-io/thanos/issues/1632 Signed-off-by: Bartek Plotka --- CHANGELOG.md | 9 +- cmd/thanos/store.go | 6 + .../test-storeset-pre-v0.8.0/storeset.go | 444 +++++++++++++ .../test-storeset-pre-v0.8.0/storeset_test.go | 224 +++++++ pkg/query/storeset.go | 415 +++++++----- pkg/query/storeset_test.go | 594 ++++++++++++------ pkg/store/bucket.go | 52 +- pkg/store/bucket_e2e_test.go | 4 +- pkg/store/bucket_test.go | 51 +- 9 files changed, 1409 insertions(+), 390 deletions(-) create mode 100644 pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go create mode 100644 pkg/query/internal/test-storeset-pre-v0.8.0/storeset_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index eec4637180..6c7c81bc85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,13 @@ We use *breaking* word for marking changes that are not backward compatible (rel ## Unreleased +### Fixed + +- [#1632](https://github.com/thanos-io/thanos/issues/1632) Removes the duplicated external labels detection on Thanos Querier; warning only; Made Store Gateway compatible with older Querier versions. + NOTE: `thanos_store_nodes_grpc_connections` metric is now per `external_labels` and `store_type`. It is a recommended metric for Querier storeAPIs. `thanos_store_node_info` is marked as obsolete and will be removed in next release. + NOTE2: Store Gateway is not advertising artificial: `"@thanos_compatibility_store_type=store"` label. This is to have current Store Gateway compatible with Querier pre v0.8.0. +This label can be disabled by hidden `debug.advertise-compatibility-label=false` flag on Store Gateway. + ## [v0.8.0](https://github.com/thanos-io/thanos/releases/tag/v0.8.0) - 2019.10.10 Lot's of improvements this release! Noteworthy items: @@ -51,7 +58,7 @@ Selecting blocks to serve depends on the result of block labels relabeling. - [#1362](https://github.com/thanos-io/thanos/pull/1362) `query.replica-label` configuration can be provided more than once for multiple deduplication labels like: `--query.replica-label=prometheus_replica --query.replica-label=service`. - [#1581](https://github.com/thanos-io/thanos/pull/1581) Thanos Store now can use smaller buffer sizes for Bytes pool; reducing memory for some requests. -- [#1622](https://github.com/thanos-io/thanos/pull/1622) & [#1590](https://github.com/thanos-io/thanos/pull/1590) Updated to Go 1.13.1 +- [#1622](https://github.com/thanos-io/thanos/pull/1622) & [#1590](https://github.com/thanos-io/thanos/pull/1590) Upgraded to Go 1.13.1 - [#1498](https://github.com/thanos-io/thanos/pull/1498) Thanos Receive change flag `labels` to `label` to be consistent with other commands. ### Fixed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index fb5b6f0431..fdb84f1647 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -60,6 +60,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store will serve only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("9999-12-31T23:59:59Z")) + advertiseCompatibilityLabel := cmd.Flag("debug.advertise-compatibility-label", "If true, Store Gateway in addition to other labels, will advertise special \"@thanos_compatibility_store_type=store\" label set. This makes store Gateway compatible with Querier before 0.8.0"). + Hidden().Default("true").Bool() + selectorRelabelConf := regSelectorRelabelFlags(cmd) m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { @@ -92,6 +95,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { MaxTime: *maxTime, }, selectorRelabelConf, + *advertiseCompatibilityLabel, ) } } @@ -119,6 +123,7 @@ func runStore( blockSyncConcurrency int, filterConf *store.FilterConfig, selectorRelabelConf *extflag.PathOrContent, + advertiseCompatibilityLabel bool, ) error { statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) @@ -173,6 +178,7 @@ func runStore( blockSyncConcurrency, filterConf, relabelConfig, + advertiseCompatibilityLabel, ) if err != nil { return errors.Wrap(err, "create object storage store") diff --git a/pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go b/pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go new file mode 100644 index 0000000000..041796d9f5 --- /dev/null +++ b/pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go @@ -0,0 +1,444 @@ +/* +This package is for compatibility testing purposes. It is a code from v0.7.0 Querier. +*/ + +package testoldstoreset + +import ( + "context" + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/tsdb/labels" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/storepb" + "google.golang.org/grpc" +) + +const ( + unhealthyStoreMessage = "removing store because it's unhealthy or does not exist" + droppingStoreMessage = "dropping store, external labels are not unique" +) + +type StoreSpec interface { + // Addr returns StoreAPI Address for the store spec. It is used as ID for store. + Addr() string + // Metadata returns current labels, store type and min, max ranges for store. + // It can change for every call for this method. + // If metadata call fails we assume that store is no longer accessible and we should not use it. + // NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage + // given store connection. + Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, err error) +} + +type StoreStatus struct { + Name string + LastCheck time.Time + LastError error + LabelSets []storepb.LabelSet + StoreType component.StoreAPI + MinTime int64 + MaxTime int64 +} + +type grpcStoreSpec struct { + addr string +} + +// NewGRPCStoreSpec creates store pure gRPC spec. +// It uses Info gRPC call to get Metadata. +func NewGRPCStoreSpec(addr string) StoreSpec { + return &grpcStoreSpec{addr: addr} +} + +func (s *grpcStoreSpec) Addr() string { + // API addr should not change between state changes. + return s.addr +} + +// Metadata method for gRPC store API tries to reach host Info method until context timeout. If we are unable to get metadata after +// that time, we assume that the host is unhealthy and return error. +func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, err error) { + resp, err := client.Info(ctx, &storepb.InfoRequest{}, grpc.WaitForReady(true)) + if err != nil { + return nil, 0, 0, errors.Wrapf(err, "fetching store info from %s", s.addr) + } + if len(resp.LabelSets) == 0 && len(resp.Labels) > 0 { + resp.LabelSets = []storepb.LabelSet{{Labels: resp.Labels}} + } + + return resp.LabelSets, resp.MinTime, resp.MaxTime, nil +} + +// StoreSet maintains a set of active stores. It is backed up by Store Specifications that are dynamically fetched on +// every Update() call. +type StoreSet struct { + logger log.Logger + + // Store specifications can change dynamically. If some store is missing from the list, we assuming it is no longer + // accessible and we close gRPC client for it. + storeSpecs func() []StoreSpec + dialOpts []grpc.DialOption + gRPCInfoCallTimeout time.Duration + + mtx sync.RWMutex + storesStatusesMtx sync.RWMutex + stores map[string]*storeRef + storeNodeConnections prometheus.Gauge + externalLabelOccurrencesInStores map[string]int + storeStatuses map[string]*StoreStatus + unhealthyStoreTimeout time.Duration +} + +type storeSetNodeCollector struct { + externalLabelOccurrences func() map[string]int +} + +var nodeInfoDesc = prometheus.NewDesc( + "thanos_store_node_info", + "Number of nodes with the same external labels identified by their hash. If any time-series is larger than 1, external label uniqueness is not true", + []string{"external_labels"}, nil, +) + +func (c *storeSetNodeCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- nodeInfoDesc +} + +func (c *storeSetNodeCollector) Collect(ch chan<- prometheus.Metric) { + externalLabelOccurrences := c.externalLabelOccurrences() + for externalLabels, occurrences := range externalLabelOccurrences { + ch <- prometheus.MustNewConstMetric(nodeInfoDesc, prometheus.GaugeValue, float64(occurrences), externalLabels) + } +} + +// NewStoreSet returns a new set of stores from cluster peers and statically configured ones. +func NewStoreSet( + logger log.Logger, + reg *prometheus.Registry, + storeSpecs func() []StoreSpec, + dialOpts []grpc.DialOption, + unhealthyStoreTimeout time.Duration, +) *StoreSet { + storeNodeConnections := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "thanos_store_nodes_grpc_connections", + Help: "Number indicating current number of gRPC connection to store nodes. This indicates also to how many stores query node have access to.", + }) + + if logger == nil { + logger = log.NewNopLogger() + } + if reg != nil { + reg.MustRegister(storeNodeConnections) + } + if storeSpecs == nil { + storeSpecs = func() []StoreSpec { return nil } + } + + ss := &StoreSet{ + logger: log.With(logger, "component", "storeset"), + storeSpecs: storeSpecs, + dialOpts: dialOpts, + storeNodeConnections: storeNodeConnections, + gRPCInfoCallTimeout: 10 * time.Second, + externalLabelOccurrencesInStores: map[string]int{}, + stores: make(map[string]*storeRef), + storeStatuses: make(map[string]*StoreStatus), + unhealthyStoreTimeout: unhealthyStoreTimeout, + } + + storeNodeCollector := &storeSetNodeCollector{externalLabelOccurrences: ss.externalLabelOccurrences} + if reg != nil { + reg.MustRegister(storeNodeCollector) + } + + return ss +} + +type storeRef struct { + storepb.StoreClient + + mtx sync.RWMutex + cc *grpc.ClientConn + addr string + + // Meta (can change during runtime). + labelSets []storepb.LabelSet + storeType component.StoreAPI + minTime int64 + maxTime int64 + + logger log.Logger +} + +func (s *storeRef) Update(labelSets []storepb.LabelSet, minTime int64, maxTime int64) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.labelSets = labelSets + s.minTime = minTime + s.maxTime = maxTime +} + +func (s *storeRef) LabelSets() []storepb.LabelSet { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.labelSets +} + +func (s *storeRef) TimeRange() (int64, int64) { + s.mtx.RLock() + defer s.mtx.RUnlock() + + return s.minTime, s.maxTime +} + +func (s *storeRef) String() string { + mint, maxt := s.TimeRange() + return fmt.Sprintf("Addr: %s LabelSets: %v Mint: %d Maxt: %d", s.addr, storepb.LabelSetsToString(s.LabelSets()), mint, maxt) +} + +func (s *storeRef) Addr() string { + return s.addr +} + +func (s *storeRef) close() { + runutil.CloseWithLogOnErr(s.logger, s.cc, fmt.Sprintf("store %v connection close", s.addr)) +} + +// Update updates the store set. It fetches current list of store specs from function and updates the fresh metadata +// from all stores. +func (s *StoreSet) Update(ctx context.Context) { + healthyStores := s.getHealthyStores(ctx) + + // Record the number of occurrences of external label combinations for current store slice. + externalLabelOccurrencesInStores := map[string]int{} + for _, st := range healthyStores { + externalLabelOccurrencesInStores[externalLabelsFromStore(st)]++ + } + level.Debug(s.logger).Log("msg", "updating healthy stores", "externalLabelOccurrencesInStores", fmt.Sprintf("%#+v", externalLabelOccurrencesInStores)) + + s.mtx.Lock() + defer s.mtx.Unlock() + + // Close stores that where not healthy this time (are not in healthy stores map). + for addr, store := range s.stores { + if _, ok := healthyStores[addr]; ok { + continue + } + + // Peer does not exists anymore. + store.close() + delete(s.stores, addr) + s.updateStoreStatus(store, errors.New(unhealthyStoreMessage)) + level.Info(s.logger).Log("msg", unhealthyStoreMessage, "address", addr) + } + + // Add stores that are not yet in s.stores. + for addr, store := range healthyStores { + if _, ok := s.stores[addr]; ok { + s.updateStoreStatus(store, nil) + continue + } + + externalLabels := externalLabelsFromStore(store) + if len(store.LabelSets()) > 0 && + externalLabelOccurrencesInStores[externalLabels] != 1 { + store.close() + s.updateStoreStatus(store, errors.New(droppingStoreMessage)) + level.Warn(s.logger).Log("msg", droppingStoreMessage, "address", addr, "extLset", externalLabels, "duplicates", externalLabelOccurrencesInStores[externalLabels]) + // We don't want to block all of them. Leave one to not disrupt in terms of migration. + externalLabelOccurrencesInStores[externalLabels]-- + continue + } + + s.stores[addr] = store + s.updateStoreStatus(store, nil) + level.Info(s.logger).Log("msg", "adding new store to query storeset", "address", addr) + } + + s.externalLabelOccurrencesInStores = externalLabelOccurrencesInStores + s.storeNodeConnections.Set(float64(len(s.stores))) + s.cleanUpStoreStatuses() +} + +func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef { + var ( + unique = make(map[string]struct{}) + + healthyStores = make(map[string]*storeRef, len(s.stores)) + mtx sync.Mutex + wg sync.WaitGroup + ) + + // Gather healthy stores map concurrently. Build new store if does not exist already. + for _, storeSpec := range s.storeSpecs() { + if _, ok := unique[storeSpec.Addr()]; ok { + level.Warn(s.logger).Log("msg", "duplicated address in store nodes", "address", storeSpec.Addr()) + continue + } + unique[storeSpec.Addr()] = struct{}{} + + wg.Add(1) + go func(spec StoreSpec) { + defer wg.Done() + + addr := spec.Addr() + + ctx, cancel := context.WithTimeout(ctx, s.gRPCInfoCallTimeout) + defer cancel() + + store, ok := s.stores[addr] + if ok { + // Check existing store. Is it healthy? What are current metadata? + labelSets, minTime, maxTime, err := spec.Metadata(ctx, store.StoreClient) + if err != nil { + // Peer unhealthy. Do not include in healthy stores. + s.updateStoreStatus(store, err) + level.Warn(s.logger).Log("msg", "update of store node failed", "err", err, "address", addr) + return + } + store.Update(labelSets, minTime, maxTime) + } else { + // New store or was unhealthy and was removed in the past - create new one. + conn, err := grpc.DialContext(ctx, addr, s.dialOpts...) + if err != nil { + s.updateStoreStatus(&storeRef{addr: addr}, err) + level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "dialing connection"), "address", addr) + return + } + store = &storeRef{StoreClient: storepb.NewStoreClient(conn), cc: conn, addr: addr, logger: s.logger} + + // Initial info call for all types of stores to check gRPC StoreAPI. + resp, err := store.StoreClient.Info(ctx, &storepb.InfoRequest{}, grpc.WaitForReady(true)) + if err != nil { + store.close() + s.updateStoreStatus(store, err) + level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "initial store client info fetch"), "address", addr) + return + } + if len(resp.LabelSets) == 0 && len(resp.Labels) > 0 { + resp.LabelSets = []storepb.LabelSet{{Labels: resp.Labels}} + } + store.storeType = component.FromProto(resp.StoreType) + store.Update(resp.LabelSets, resp.MinTime, resp.MaxTime) + } + + mtx.Lock() + defer mtx.Unlock() + + healthyStores[addr] = store + }(storeSpec) + } + + wg.Wait() + + return healthyStores +} + +func externalLabelsFromStore(store *storeRef) string { + tsdbLabelSetStrings := make([]string, 0, len(store.labelSets)) + for _, ls := range store.labelSets { + tsdbLabels := labels.Labels(make([]labels.Label, 0, len(ls.Labels))) + for _, l := range ls.Labels { + tsdbLabels = append(tsdbLabels, labels.Label{ + Name: l.Name, + Value: l.Value, + }) + } + sort.Sort(tsdbLabels) + tsdbLabelSetStrings = append(tsdbLabelSetStrings, tsdbLabels.String()) + } + sort.Strings(tsdbLabelSetStrings) + return strings.Join(tsdbLabelSetStrings, ",") +} + +func (s *StoreSet) updateStoreStatus(store *storeRef, err error) { + s.storesStatusesMtx.Lock() + defer s.storesStatusesMtx.Unlock() + + status := StoreStatus{Name: store.addr} + prev, ok := s.storeStatuses[store.addr] + if ok { + status = *prev + } + + status.LastError = err + status.LastCheck = time.Now() + + if err == nil { + status.LabelSets = store.labelSets + status.StoreType = store.storeType + status.MinTime = store.minTime + status.MaxTime = store.maxTime + } + + s.storeStatuses[store.addr] = &status +} + +func (s *StoreSet) GetStoreStatus() []StoreStatus { + s.storesStatusesMtx.RLock() + defer s.storesStatusesMtx.RUnlock() + + statuses := make([]StoreStatus, 0, len(s.storeStatuses)) + for _, v := range s.storeStatuses { + statuses = append(statuses, *v) + } + + sort.Slice(statuses, func(i, j int) bool { + return statuses[i].Name < statuses[j].Name + }) + return statuses +} + +func (s *StoreSet) externalLabelOccurrences() map[string]int { + s.mtx.RLock() + defer s.mtx.RUnlock() + + r := make(map[string]int, len(s.externalLabelOccurrencesInStores)) + for k, v := range s.externalLabelOccurrencesInStores { + r[k] = v + } + + return r +} + +// Get returns a list of all active stores. +func (s *StoreSet) Get() []store.Client { + s.mtx.RLock() + defer s.mtx.RUnlock() + + stores := make([]store.Client, 0, len(s.stores)) + for _, st := range s.stores { + stores = append(stores, st) + } + return stores +} + +func (s *StoreSet) Close() { + for _, st := range s.stores { + st.close() + } +} + +func (s *StoreSet) cleanUpStoreStatuses() { + s.storesStatusesMtx.Lock() + defer s.storesStatusesMtx.Unlock() + + now := time.Now() + for addr, status := range s.storeStatuses { + if _, ok := s.stores[addr]; !ok { + if now.Sub(status.LastCheck) >= s.unhealthyStoreTimeout { + delete(s.storeStatuses, addr) + } + } + } +} diff --git a/pkg/query/internal/test-storeset-pre-v0.8.0/storeset_test.go b/pkg/query/internal/test-storeset-pre-v0.8.0/storeset_test.go new file mode 100644 index 0000000000..37b94836b1 --- /dev/null +++ b/pkg/query/internal/test-storeset-pre-v0.8.0/storeset_test.go @@ -0,0 +1,224 @@ +package testoldstoreset + +import ( + "context" + "fmt" + "math" + "net" + "os" + "testing" + "time" + + "github.com/thanos-io/thanos/pkg/store" + + "sort" + + "github.com/fortytw2/leaktest" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/testutil" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var testGRPCOpts = []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), + grpc.WithInsecure(), +} + +type testStore struct { + info storepb.InfoResponse +} + +func (s *testStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) { + return &s.info, nil +} + +func (s *testStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + return status.Error(codes.Unimplemented, "not implemented") +} + +func (s *testStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( + *storepb.LabelNamesResponse, error, +) { + return nil, status.Error(codes.Unimplemented, "not implemented") +} + +func (s *testStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) ( + *storepb.LabelValuesResponse, error, +) { + return nil, status.Error(codes.Unimplemented, "not implemented") +} + +type testStoreMeta struct { + extlsetFn func(addr string) []storepb.LabelSet + storeType component.StoreAPI +} + +type testStores struct { + srvs map[string]*grpc.Server +} + +func startTestStores(stores []testStoreMeta) (*testStores, error) { + st := &testStores{ + srvs: map[string]*grpc.Server{}, + } + + for _, store := range stores { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + // Close so far started servers. + st.Close() + return nil, err + } + + srv := grpc.NewServer() + storepb.RegisterStoreServer(srv, &testStore{info: storepb.InfoResponse{LabelSets: store.extlsetFn(listener.Addr().String()), StoreType: store.storeType.ToProto()}}) + go func() { + _ = srv.Serve(listener) + }() + + st.srvs[listener.Addr().String()] = srv + } + + return st, nil +} + +func (s *testStores) StoreAddresses() []string { + var stores []string + for addr := range s.srvs { + stores = append(stores, addr) + } + return stores +} + +func (s *testStores) Close() { + for _, srv := range s.srvs { + srv.Stop() + } + s.srvs = nil +} + +func (s *testStores) CloseOne(addr string) { + srv, ok := s.srvs[addr] + if !ok { + return + } + + srv.Stop() + delete(s.srvs, addr) +} + +func specsFromAddrFunc(addrs []string) func() []StoreSpec { + return func() (specs []StoreSpec) { + for _, addr := range addrs { + specs = append(specs, NewGRPCStoreSpec(addr)) + } + return specs + } +} + +func TestPre0_8_0_StoreSet_AgainstNewStoreGW(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + st, err := startTestStores([]testStoreMeta{ + { + storeType: component.Sidecar, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + } + }, + }, + { + storeType: component.Store, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + // This is the labelset exposed by store when having only one sidecar's data. + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + { + Labels: []storepb.Label{{Name: store.CompatibilityTypeLabelName, Value: "store"}}, + }, + } + }, + }, + // We expect this to be duplicated. + { + storeType: component.Store, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + { + Labels: []storepb.Label{{Name: store.CompatibilityTypeLabelName, Value: "store"}}, + }, + } + }, + }, + }) + testutil.Ok(t, err) + defer st.Close() + + logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + logger = level.NewFilter(logger, level.AllowDebug()) + logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + storeSet := NewStoreSet(logger, nil, specsFromAddrFunc(st.StoreAddresses()), testGRPCOpts, time.Minute) + storeSet.gRPCInfoCallTimeout = 2 * time.Second + defer storeSet.Close() + + // Should not matter how many of these we run. + storeSet.Update(context.Background()) + storeSet.Update(context.Background()) + storeSet.Update(context.Background()) + storeSet.Update(context.Background()) + + testutil.Assert(t, len(storeSet.stores) == 2, fmt.Sprintf("all services should respond just fine, but we expect duplicates being blocked. Expected %d stores, got %d", 5, len(storeSet.stores))) + + // Sort result to be able to compare. + var existingStoreLabels [][][]storepb.Label + for _, store := range storeSet.stores { + lset := [][]storepb.Label{} + for _, ls := range store.LabelSets() { + lset = append(lset, ls.Labels) + } + existingStoreLabels = append(existingStoreLabels, lset) + } + sort.Slice(existingStoreLabels, func(i, j int) bool { + return len(existingStoreLabels[i]) > len(existingStoreLabels[j]) + }) + + testutil.Equals(t, [][][]storepb.Label{ + { + { + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + { + {Name: store.CompatibilityTypeLabelName, Value: "store"}, + }, + }, + { + { + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + }, existingStoreLabels) +} diff --git a/pkg/query/storeset.go b/pkg/query/storeset.go index 1d49b0f73e..ebfa441611 100644 --- a/pkg/query/storeset.go +++ b/pkg/query/storeset.go @@ -22,7 +22,6 @@ import ( const ( unhealthyStoreMessage = "removing store because it's unhealthy or does not exist" - droppingStoreMessage = "dropping store, external labels are not unique" ) type StoreSpec interface { @@ -33,7 +32,7 @@ type StoreSpec interface { // If metadata call fails we assume that store is no longer accessible and we should not use it. // NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage // given store connection. - Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, err error) + Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, storeType component.StoreAPI, err error) } type StoreStatus struct { @@ -63,16 +62,86 @@ func (s *grpcStoreSpec) Addr() string { // Metadata method for gRPC store API tries to reach host Info method until context timeout. If we are unable to get metadata after // that time, we assume that the host is unhealthy and return error. -func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, err error) { +func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, storeType component.StoreAPI, err error) { resp, err := client.Info(ctx, &storepb.InfoRequest{}, grpc.WaitForReady(true)) if err != nil { - return nil, 0, 0, errors.Wrapf(err, "fetching store info from %s", s.addr) + return nil, 0, 0, nil, errors.Wrapf(err, "fetching store info from %s", s.addr) } if len(resp.LabelSets) == 0 && len(resp.Labels) > 0 { resp.LabelSets = []storepb.LabelSet{{Labels: resp.Labels}} } - return resp.LabelSets, resp.MinTime, resp.MaxTime, nil + return resp.LabelSets, resp.MinTime, resp.MaxTime, component.FromProto(resp.StoreType), nil +} + +// storeSetNodeCollector is metric collector for Guge indicated number of available storeAPIs for Querier. +// Collector is requires as we want atomic updates for all 'thanos_store_nodes_grpc_connections' series. +type storeSetNodeCollector struct { + mtx sync.Mutex + storeNodes map[component.StoreAPI]map[string]int + storePerExtLset map[string]int + + metricDesc *prometheus.Desc + metricDesc2 *prometheus.Desc +} + +func newStoreSetNodeCollector() *storeSetNodeCollector { + return &storeSetNodeCollector{ + storeNodes: map[component.StoreAPI]map[string]int{}, + metricDesc: prometheus.NewDesc( + "thanos_store_nodes_grpc_connections", + "Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.", + []string{"external_labels", "store_type"}, nil, + ), + // TODO(bwplotka): Obsolete; Replaced by thanos_store_nodes_grpc_connections. + // Remove in next minor release. + metricDesc2: prometheus.NewDesc( + "thanos_store_node_info", + "Deprecated, use thanos_store_node_info instead.", + []string{"external_labels"}, nil, + ), + } +} + +func (c *storeSetNodeCollector) Update(nodes map[component.StoreAPI]map[string]int) { + storeNodes := make(map[component.StoreAPI]map[string]int, len(nodes)) + storePerExtLset := map[string]int{} + + for k, v := range nodes { + storeNodes[k] = make(map[string]int, len(v)) + for kk, vv := range v { + storePerExtLset[kk] += vv + storeNodes[k][kk] = vv + } + } + + c.mtx.Lock() + defer c.mtx.Unlock() + c.storeNodes = storeNodes + c.storePerExtLset = storePerExtLset +} + +func (c *storeSetNodeCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.metricDesc + ch <- c.metricDesc2 +} + +func (c *storeSetNodeCollector) Collect(ch chan<- prometheus.Metric) { + c.mtx.Lock() + defer c.mtx.Unlock() + + for storeType, occurrencesPerExtLset := range c.storeNodes { + for externalLabels, occurrences := range occurrencesPerExtLset { + var storeTypeStr string + if storeType != nil { + storeTypeStr = storeType.String() + } + ch <- prometheus.MustNewConstMetric(c.metricDesc, prometheus.GaugeValue, float64(occurrences), externalLabels, storeTypeStr) + } + } + for externalLabels, occur := range c.storePerExtLset { + ch <- prometheus.MustNewConstMetric(c.metricDesc2, prometheus.GaugeValue, float64(occur), externalLabels) + } } // StoreSet maintains a set of active stores. It is backed up by Store Specifications that are dynamically fetched on @@ -86,34 +155,17 @@ type StoreSet struct { dialOpts []grpc.DialOption gRPCInfoCallTimeout time.Duration - mtx sync.RWMutex - storesStatusesMtx sync.RWMutex - stores map[string]*storeRef - storeNodeConnections prometheus.Gauge - externalLabelOccurrencesInStores map[string]int - storeStatuses map[string]*StoreStatus - unhealthyStoreTimeout time.Duration -} - -type storeSetNodeCollector struct { - externalLabelOccurrences func() map[string]int -} + updateMtx sync.Mutex + storesMtx sync.RWMutex + storesStatusesMtx sync.RWMutex -var nodeInfoDesc = prometheus.NewDesc( - "thanos_store_node_info", - "Number of nodes with the same external labels identified by their hash. If any time-series is larger than 1, external label uniqueness is not true", - []string{"external_labels"}, nil, -) - -func (c *storeSetNodeCollector) Describe(ch chan<- *prometheus.Desc) { - ch <- nodeInfoDesc -} + // Main map of stores currently used for fanout. + stores map[string]*storeRef + storesMetric *storeSetNodeCollector -func (c *storeSetNodeCollector) Collect(ch chan<- prometheus.Metric) { - externalLabelOccurrences := c.externalLabelOccurrences() - for externalLabels, occurrences := range externalLabelOccurrences { - ch <- prometheus.MustNewConstMetric(nodeInfoDesc, prometheus.GaugeValue, float64(occurrences), externalLabels) - } + // Map of statuses used only by UI. + storeStatuses map[string]*StoreStatus + unhealthyStoreTimeout time.Duration } // NewStoreSet returns a new set of stores from cluster peers and statically configured ones. @@ -124,38 +176,28 @@ func NewStoreSet( dialOpts []grpc.DialOption, unhealthyStoreTimeout time.Duration, ) *StoreSet { - storeNodeConnections := prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "thanos_store_nodes_grpc_connections", - Help: "Number indicating current number of gRPC connection to store nodes. This indicates also to how many stores query node have access to.", - }) + storesMetric := newStoreSetNodeCollector() + if reg != nil { + reg.MustRegister(storesMetric) + } if logger == nil { logger = log.NewNopLogger() } - if reg != nil { - reg.MustRegister(storeNodeConnections) - } if storeSpecs == nil { storeSpecs = func() []StoreSpec { return nil } } ss := &StoreSet{ - logger: log.With(logger, "component", "storeset"), - storeSpecs: storeSpecs, - dialOpts: dialOpts, - storeNodeConnections: storeNodeConnections, - gRPCInfoCallTimeout: 10 * time.Second, - externalLabelOccurrencesInStores: map[string]int{}, - stores: make(map[string]*storeRef), - storeStatuses: make(map[string]*StoreStatus), - unhealthyStoreTimeout: unhealthyStoreTimeout, + logger: log.With(logger, "component", "storeset"), + storeSpecs: storeSpecs, + dialOpts: dialOpts, + storesMetric: storesMetric, + gRPCInfoCallTimeout: 10 * time.Second, + stores: make(map[string]*storeRef), + storeStatuses: make(map[string]*StoreStatus), + unhealthyStoreTimeout: unhealthyStoreTimeout, } - - storeNodeCollector := &storeSetNodeCollector{externalLabelOccurrences: ss.externalLabelOccurrences} - if reg != nil { - reg.MustRegister(storeNodeCollector) - } - return ss } @@ -175,22 +217,78 @@ type storeRef struct { logger log.Logger } -func (s *storeRef) Update(labelSets []storepb.LabelSet, minTime int64, maxTime int64) { +func (s *storeRef) Update(labelSets []storepb.LabelSet, minTime int64, maxTime int64, storeType component.StoreAPI) { s.mtx.Lock() defer s.mtx.Unlock() + s.storeType = storeType s.labelSets = labelSets s.minTime = minTime s.maxTime = maxTime } +func (s *storeRef) StoreType() component.StoreAPI { + s.mtx.RLock() + defer s.mtx.RUnlock() + + return s.storeType +} + func (s *storeRef) LabelSets() []storepb.LabelSet { s.mtx.RLock() defer s.mtx.RUnlock() - return s.labelSets + + labelSet := make([]storepb.LabelSet, 0, len(s.labelSets)) + for _, ls := range s.labelSets { + if len(ls.Labels) == 0 { + continue + } + // Compatibility label for Queriers pre 0.8.1. Filter it out now. + if ls.Labels[0].Name == store.CompatibilityTypeLabelName { + continue + } + + lbls := make([]storepb.Label, 0, len(ls.Labels)) + for _, l := range ls.Labels { + lbls = append(lbls, storepb.Label{ + Name: l.Name, + Value: l.Value, + }) + } + labelSet = append(labelSet, storepb.LabelSet{Labels: lbls}) + } + return labelSet } -func (s *storeRef) TimeRange() (int64, int64) { +func (s *storeRef) LabelSetsString() string { + s.mtx.RLock() + defer s.mtx.RUnlock() + + labelSet := make([]string, 0, len(s.labelSets)) + for _, ls := range s.labelSets { + if len(ls.Labels) == 0 { + continue + } + // Compatibility label for Queriers pre 0.8.1. Filter it out now. + if ls.Labels[0].Name == store.CompatibilityTypeLabelName { + continue + } + + lbls := labels.Labels(make([]labels.Label, 0, len(ls.Labels))) + for _, l := range ls.Labels { + lbls = append(lbls, labels.Label{ + Name: l.Name, + Value: l.Value, + }) + } + sort.Sort(lbls) + labelSet = append(labelSet, lbls.String()) + } + sort.Strings(labelSet) + return strings.Join(labelSet, ",") +} + +func (s *storeRef) TimeRange() (mint int64, maxt int64) { s.mtx.RLock() defer s.mtx.RUnlock() @@ -206,79 +304,87 @@ func (s *storeRef) Addr() string { return s.addr } -func (s *storeRef) close() { +func (s *storeRef) Close() { runutil.CloseWithLogOnErr(s.logger, s.cc, fmt.Sprintf("store %v connection close", s.addr)) } +func newStoreAPIStats() map[component.StoreAPI]map[string]int { + nodes := make(map[component.StoreAPI]map[string]int, len(storepb.StoreType_name)) + for i := range storepb.StoreType_name { + nodes[component.FromProto(storepb.StoreType(i))] = map[string]int{} + } + return nodes +} + // Update updates the store set. It fetches current list of store specs from function and updates the fresh metadata // from all stores. func (s *StoreSet) Update(ctx context.Context) { - healthyStores := s.getHealthyStores(ctx) + s.updateMtx.Lock() + defer s.updateMtx.Unlock() - // Record the number of occurrences of external label combinations for current store slice. - externalLabelOccurrencesInStores := map[string]int{} - for _, st := range healthyStores { - externalLabelOccurrencesInStores[externalLabelsFromStore(st)]++ + s.storesMtx.RLock() + stores := make(map[string]*storeRef, len(s.stores)) + for addr, st := range s.stores { + stores[addr] = st } - level.Debug(s.logger).Log("msg", "updating healthy stores", "externalLabelOccurrencesInStores", fmt.Sprintf("%#+v", externalLabelOccurrencesInStores)) + s.storesMtx.RUnlock() - s.mtx.Lock() - defer s.mtx.Unlock() + level.Debug(s.logger).Log("msg", "starting updating storeAPIs", "cachedStores", len(stores)) + + healthyStores := s.getHealthyStores(ctx, stores) + level.Debug(s.logger).Log("msg", "checked requested storeAPIs", "healthyStores", len(healthyStores), "cachedStores", len(stores)) + + stats := newStoreAPIStats() // Close stores that where not healthy this time (are not in healthy stores map). - for addr, store := range s.stores { + for addr, st := range stores { if _, ok := healthyStores[addr]; ok { + stats[st.StoreType()][st.LabelSetsString()]++ continue } - // Peer does not exists anymore. - store.close() - delete(s.stores, addr) - s.updateStoreStatus(store, errors.New(unhealthyStoreMessage)) - level.Info(s.logger).Log("msg", unhealthyStoreMessage, "address", addr) + st.Close() + delete(stores, addr) + s.updateStoreStatus(st, errors.New(unhealthyStoreMessage)) + level.Info(s.logger).Log("msg", unhealthyStoreMessage, "address", addr, "extLset", st.LabelSetsString()) } - // Add stores that are not yet in s.stores. - for addr, store := range healthyStores { - if _, ok := s.stores[addr]; ok { - s.updateStoreStatus(store, nil) + // Add stores that are not yet in stores. + for addr, st := range healthyStores { + if _, ok := stores[addr]; ok { continue } - // Check if the store has some external labels specified and if any if there are duplicates. - // We warn and exclude duplicates because it unnecessarily puts additional load on querier, network and underlying StoreAPIs and - // it indicates misconfiguration. - // - // Note: No external labels means strictly store gateway or ruler and it is fine to have access to multiple instances of them. - // Any other component will error out if it will be configured with empty external labels. - externalLabels := externalLabelsFromStore(store) - if len(store.LabelSets()) > 0 && - store.storeType != nil && - store.storeType.ToProto() != storepb.StoreType_STORE && - externalLabelOccurrencesInStores[externalLabels] != 1 { - store.close() - s.updateStoreStatus(store, errors.New(droppingStoreMessage)) - level.Warn(s.logger).Log("msg", droppingStoreMessage, "address", addr, "extLset", externalLabels, "duplicates", externalLabelOccurrencesInStores[externalLabels]) - // We don't want to block all of them. Leave one to not disrupt in terms of migration. - externalLabelOccurrencesInStores[externalLabels]-- - continue + extLset := st.LabelSetsString() + + // All producers should have unique external labels. While this does not check only StoreAPIs connected to + // this querier this allows to notify early user about misconfiguration. Warn only. This is also detectable from metric. + if st.StoreType() != nil && + (st.StoreType() == component.Sidecar || st.StoreType() == component.Rule) && + stats[component.Sidecar][extLset]+stats[component.Rule][extLset] > 0 { + + level.Warn(s.logger).Log("msg", "found duplicate storeAPI producer (sidecar or ruler). This is not advices as it will malform data in in the same bucket", + "address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar][extLset]+stats[component.Rule][extLset]+1)) } + stats[st.StoreType()][st.LabelSetsString()]++ - s.stores[addr] = store - s.updateStoreStatus(store, nil) - level.Info(s.logger).Log("msg", "adding new store to query storeset", "address", addr) + stores[addr] = st + s.updateStoreStatus(st, nil) + level.Info(s.logger).Log("msg", "adding new storeAPI to query storeset", "address", addr, "extLset", extLset) } - s.externalLabelOccurrencesInStores = externalLabelOccurrencesInStores - s.storeNodeConnections.Set(float64(len(s.stores))) - s.cleanUpStoreStatuses() + s.storesMetric.Update(stats) + s.storesMtx.Lock() + s.stores = stores + s.storesMtx.Unlock() + + s.cleanUpStoreStatuses(stores) } -func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef { +func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*storeRef) map[string]*storeRef { var ( - unique = make(map[string]struct{}) - - healthyStores = make(map[string]*storeRef, len(s.stores)) + unique = make(map[string]struct{}) + healthyStores = make(map[string]*storeRef, len(stores)) mtx sync.Mutex wg sync.WaitGroup ) @@ -300,18 +406,8 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef { ctx, cancel := context.WithTimeout(ctx, s.gRPCInfoCallTimeout) defer cancel() - store, ok := s.stores[addr] - if ok { - // Check existing store. Is it healthy? What are current metadata? - labelSets, minTime, maxTime, err := spec.Metadata(ctx, store.StoreClient) - if err != nil { - // Peer unhealthy. Do not include in healthy stores. - s.updateStoreStatus(store, err) - level.Warn(s.logger).Log("msg", "update of store node failed", "err", err, "address", addr) - return - } - store.Update(labelSets, minTime, maxTime) - } else { + st, seenAlready := stores[addr] + if !seenAlready { // New store or was unhealthy and was removed in the past - create new one. conn, err := grpc.DialContext(ctx, addr, s.dialOpts...) if err != nil { @@ -319,53 +415,34 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef { level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "dialing connection"), "address", addr) return } - store = &storeRef{StoreClient: storepb.NewStoreClient(conn), cc: conn, addr: addr, logger: s.logger} + st = &storeRef{StoreClient: storepb.NewStoreClient(conn), cc: conn, addr: addr, logger: s.logger} + } - // Initial info call for all types of stores to check gRPC StoreAPI. - resp, err := store.StoreClient.Info(ctx, &storepb.InfoRequest{}, grpc.WaitForReady(true)) - if err != nil { - store.close() - s.updateStoreStatus(store, err) - level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "initial store client info fetch"), "address", addr) - return - } - if len(resp.LabelSets) == 0 && len(resp.Labels) > 0 { - resp.LabelSets = []storepb.LabelSet{{Labels: resp.Labels}} + // Check existing or new store. Is it healthy? What are current metadata? + labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient) + if err != nil { + if !seenAlready { + // Close only if new. Unhealthy `s.stores` will be closed later on. + st.Close() } - store.storeType = component.FromProto(resp.StoreType) - store.Update(resp.LabelSets, resp.MinTime, resp.MaxTime) + s.updateStoreStatus(st, err) + level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "getting metadata"), "address", addr) + return } + s.updateStoreStatus(st, nil) + st.Update(labelSets, minTime, maxTime, storeType) mtx.Lock() defer mtx.Unlock() - healthyStores[addr] = store + healthyStores[addr] = st }(storeSpec) } - wg.Wait() return healthyStores } -func externalLabelsFromStore(store *storeRef) string { - tsdbLabelSetStrings := make([]string, 0, len(store.labelSets)) - for _, ls := range store.labelSets { - tsdbLabels := labels.Labels{} - for _, l := range ls.Labels { - tsdbLabels = append(tsdbLabels, labels.Label{ - Name: l.Name, - Value: l.Value, - }) - } - sort.Sort(tsdbLabels) - tsdbLabelSetStrings = append(tsdbLabelSetStrings, tsdbLabels.String()) - } - sort.Strings(tsdbLabelSetStrings) - - return strings.Join(tsdbLabelSetStrings, ",") -} - func (s *StoreSet) updateStoreStatus(store *storeRef, err error) { s.storesStatusesMtx.Lock() defer s.storesStatusesMtx.Unlock() @@ -380,10 +457,12 @@ func (s *StoreSet) updateStoreStatus(store *storeRef, err error) { status.LastCheck = time.Now() if err == nil { - status.LabelSets = store.labelSets - status.StoreType = store.storeType - status.MinTime = store.minTime - status.MaxTime = store.maxTime + + mint, maxt := store.TimeRange() + status.LabelSets = store.LabelSets() + status.StoreType = store.StoreType() + status.MinTime = mint + status.MaxTime = maxt } s.storeStatuses[store.addr] = &status @@ -404,22 +483,10 @@ func (s *StoreSet) GetStoreStatus() []StoreStatus { return statuses } -func (s *StoreSet) externalLabelOccurrences() map[string]int { - s.mtx.RLock() - defer s.mtx.RUnlock() - - r := make(map[string]int, len(s.externalLabelOccurrencesInStores)) - for k, v := range s.externalLabelOccurrencesInStores { - r[k] = v - } - - return r -} - // Get returns a list of all active stores. func (s *StoreSet) Get() []store.Client { - s.mtx.RLock() - defer s.mtx.RUnlock() + s.storesMtx.RLock() + defer s.storesMtx.RUnlock() stores := make([]store.Client, 0, len(s.stores)) for _, st := range s.stores { @@ -429,21 +496,27 @@ func (s *StoreSet) Get() []store.Client { } func (s *StoreSet) Close() { + s.storesMtx.Lock() + defer s.storesMtx.Unlock() + for _, st := range s.stores { - st.close() + st.Close() } + s.stores = map[string]*storeRef{} } -func (s *StoreSet) cleanUpStoreStatuses() { +func (s *StoreSet) cleanUpStoreStatuses(stores map[string]*storeRef) { s.storesStatusesMtx.Lock() defer s.storesStatusesMtx.Unlock() now := time.Now() for addr, status := range s.storeStatuses { - if _, ok := s.stores[addr]; !ok { - if now.Sub(status.LastCheck) >= s.unhealthyStoreTimeout { - delete(s.storeStatuses, addr) - } + if _, ok := stores[addr]; ok { + continue + } + + if now.Sub(status.LastCheck) >= s.unhealthyStoreTimeout { + delete(s.storeStatuses, addr) } } } diff --git a/pkg/query/storeset_test.go b/pkg/query/storeset_test.go index 9f5cc30689..54f9d0886f 100644 --- a/pkg/query/storeset_test.go +++ b/pkg/query/storeset_test.go @@ -5,16 +5,12 @@ import ( "fmt" "math" "net" - "os" "testing" "time" - "sort" - "github.com/fortytw2/leaktest" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" "google.golang.org/grpc" @@ -27,11 +23,6 @@ var testGRPCOpts = []grpc.DialOption{ grpc.WithInsecure(), } -var ( - emptyStoresExtLabels [][]storepb.Label - emptyStoresTypes []component.StoreAPI -) - type testStore struct { info storepb.InfoResponse } @@ -56,76 +47,54 @@ func (s *testStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque return nil, status.Error(codes.Unimplemented, "not implemented") } +type testStoreMeta struct { + extlsetFn func(addr string) []storepb.LabelSet + storeType component.StoreAPI +} + type testStores struct { - srvs map[string]*grpc.Server + srvs map[string]*grpc.Server + orderAddrs []string } -func newTestStores(numStores int, storesExtLabels [][]storepb.Label, storesTypes []component.StoreAPI) (*testStores, error) { +func startTestStores(storeMetas []testStoreMeta) (*testStores, error) { st := &testStores{ srvs: map[string]*grpc.Server{}, } - for i := 0; i < numStores; i++ { - lsetFn := func(addr string) []storepb.LabelSet { - if len(storesExtLabels) != numStores { - return []storepb.LabelSet{{ - Labels: []storepb.Label{ - { - Name: "addr", - Value: addr, - }, - }, - }} - } - ls := storesExtLabels[i] - if len(ls) == 0 { - return []storepb.LabelSet{} - } - - return []storepb.LabelSet{{Labels: storesExtLabels[i]}} - } - - storeTypeFn := func() storepb.StoreType { - if len(storesTypes) != numStores { - return component.Sidecar.ToProto() - } - st := storesTypes[i] - return st.ToProto() - } - - srv, addr, err := startStore(lsetFn, storeTypeFn) + for _, meta := range storeMetas { + listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { // Close so far started servers. st.Close() return nil, err } - st.srvs[addr] = srv - } + srv := grpc.NewServer() - return st, nil -} + storeSrv := &testStore{ + info: storepb.InfoResponse{ + LabelSets: meta.extlsetFn(listener.Addr().String()), + }, + } + if meta.storeType != nil { + storeSrv.info.StoreType = meta.storeType.ToProto() + } + storepb.RegisterStoreServer(srv, storeSrv) + go func() { + _ = srv.Serve(listener) + }() -func startStore(lsetFn func(addr string) []storepb.LabelSet, storeTypeFn func() storepb.StoreType) (*grpc.Server, string, error) { - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return nil, "", err + st.srvs[listener.Addr().String()] = srv + st.orderAddrs = append(st.orderAddrs, listener.Addr().String()) } - srv := grpc.NewServer() - storepb.RegisterStoreServer(srv, &testStore{info: storepb.InfoResponse{LabelSets: lsetFn(listener.Addr().String()), StoreType: storeTypeFn()}}) - go func() { - _ = srv.Serve(listener) - }() - - return srv, listener.Addr().String(), nil + return st, nil } func (s *testStores) StoreAddresses() []string { var stores []string - for addr := range s.srvs { - stores = append(stores, addr) - } + stores = append(stores, s.orderAddrs...) return stores } @@ -146,92 +115,402 @@ func (s *testStores) CloseOne(addr string) { delete(s.srvs, addr) } -func specsFromAddrFunc(addrs []string) func() []StoreSpec { - return func() (specs []StoreSpec) { - for _, addr := range addrs { - specs = append(specs, NewGRPCStoreSpec(addr)) - } - return specs - } -} - -func TestStoreSet_AllAvailable_ThenDown(t *testing.T) { +func TestStoreSet_Update(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() - st, err := newTestStores(2, emptyStoresExtLabels, emptyStoresTypes) + stores, err := startTestStores([]testStoreMeta{ + { + storeType: component.Sidecar, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "addr", Value: addr}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "a", Value: "b"}, + }, + }, + } + }, + }, + { + storeType: component.Sidecar, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "addr", Value: addr}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "a", Value: "b"}, + }, + }, + } + }, + }, + { + storeType: component.Query, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "a", Value: "broken"}, + }, + }, + } + }, + }, + }) testutil.Ok(t, err) - defer st.Close() + defer stores.Close() - initialStoreAddr := st.StoreAddresses() + discoveredStoreAddr := stores.StoreAddresses() + + // Start with one not available. + stores.CloseOne(discoveredStoreAddr[2]) // Testing if duplicates can cause weird results. - initialStoreAddr = append(initialStoreAddr, initialStoreAddr[0]) - storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts, time.Minute) + discoveredStoreAddr = append(discoveredStoreAddr, discoveredStoreAddr[0]) + storeSet := NewStoreSet(nil, nil, func() (specs []StoreSpec) { + for _, addr := range discoveredStoreAddr { + specs = append(specs, NewGRPCStoreSpec(addr)) + } + return specs + }, testGRPCOpts, time.Minute) storeSet.gRPCInfoCallTimeout = 2 * time.Second defer storeSet.Close() // Should not matter how many of these we run. storeSet.Update(context.Background()) storeSet.Update(context.Background()) + testutil.Equals(t, 2, len(storeSet.stores)) + testutil.Equals(t, 3, len(storeSet.storeStatuses)) + + for addr, st := range storeSet.stores { + testutil.Equals(t, addr, st.addr) + + lset := st.LabelSets() + testutil.Equals(t, 2, len(lset)) + testutil.Equals(t, "addr", lset[0].Labels[0].Name) + testutil.Equals(t, addr, lset[0].Labels[0].Value) + testutil.Equals(t, "a", lset[1].Labels[0].Name) + testutil.Equals(t, "b", lset[1].Labels[0].Value) + } - testutil.Assert(t, len(storeSet.stores) == 2, "all services should respond just fine, so we expect all clients to be ready.") - - for addr, store := range storeSet.stores { - testutil.Equals(t, addr, store.addr) - testutil.Equals(t, 1, len(store.labelSets)) - testutil.Equals(t, "addr", store.labelSets[0].Labels[0].Name) - testutil.Equals(t, addr, store.labelSets[0].Labels[0].Value) + // Check stats. + expected := newStoreAPIStats() + expected[component.Sidecar] = map[string]int{ + fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredStoreAddr[0]): 1, + fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredStoreAddr[1]): 1, } + testutil.Equals(t, expected, storeSet.storesMetric.storeNodes) - st.CloseOne(initialStoreAddr[0]) + // Remove address from discovered and reset last check, which should ensure cleanup of status on next update. + storeSet.storeStatuses[discoveredStoreAddr[2]].LastCheck = time.Now().Add(-4 * time.Minute) + discoveredStoreAddr = discoveredStoreAddr[:len(discoveredStoreAddr)-2] + storeSet.Update(context.Background()) + testutil.Equals(t, 2, len(storeSet.storeStatuses)) + + stores.CloseOne(discoveredStoreAddr[0]) + delete(expected[component.Sidecar], fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredStoreAddr[0])) // We expect Update to tear down store client for closed store server. storeSet.Update(context.Background()) + testutil.Equals(t, 1, len(storeSet.stores), "only one service should respond just fine, so we expect one client to be ready.") + testutil.Equals(t, 2, len(storeSet.storeStatuses)) - testutil.Assert(t, len(storeSet.stores) == 1, "only one service should respond just fine, so we expect one client to be ready.") - - addr := initialStoreAddr[1] - store, ok := storeSet.stores[addr] + addr := discoveredStoreAddr[1] + st, ok := storeSet.stores[addr] testutil.Assert(t, ok, "addr exist") - testutil.Equals(t, addr, store.addr) - testutil.Equals(t, 1, len(store.labelSets)) - testutil.Equals(t, "addr", store.labelSets[0].Labels[0].Name) - testutil.Equals(t, addr, store.labelSets[0].Labels[0].Value) -} - -func TestStoreSet_StaticStores_OneAvailable(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() - - st, err := newTestStores(2, emptyStoresExtLabels, emptyStoresTypes) + testutil.Equals(t, addr, st.addr) + + lset := st.LabelSets() + testutil.Equals(t, 2, len(lset)) + testutil.Equals(t, "addr", lset[0].Labels[0].Name) + testutil.Equals(t, addr, lset[0].Labels[0].Value) + testutil.Equals(t, "a", lset[1].Labels[0].Name) + testutil.Equals(t, "b", lset[1].Labels[0].Value) + testutil.Equals(t, expected, storeSet.storesMetric.storeNodes) + + // New big batch of storeAPIs. + stores2, err := startTestStores([]testStoreMeta{ + { + storeType: component.Query, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "l3", Value: "v4"}, + }, + }, + } + }, + }, + { + // Duplicated Querier, in previous versions it would be deduplicated. Now it should be not. + storeType: component.Query, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "l3", Value: "v4"}, + }, + }, + } + }, + }, + { + storeType: component.Sidecar, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + } + }, + }, + { + // Duplicated Sidecar, in previous versions it would be deduplicated. Now it should be not. + storeType: component.Sidecar, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + } + }, + }, + { + // Querier that duplicates with sidecar, in previous versions it would be deduplicated. Now it should be not. + storeType: component.Query, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + } + }, + }, + { + // Ruler that duplicates with sidecar, in previous versions it would be deduplicated. Now it should be not. + // Warning should be produced. + storeType: component.Rule, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + } + }, + }, + { + // Duplicated Rule, in previous versions it would be deduplicated. Now it should be not. Warning should be produced. + storeType: component.Rule, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + } + }, + }, + { + // No storeType. + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "no-store-type"}, + {Name: "l2", Value: "v3"}, + }, + }, + } + }, + }, + // Two pre v0.8.0 store gateway nodes, they don't have ext labels set. + { + storeType: component.Store, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{} + }, + }, + { + storeType: component.Store, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{} + }, + }, + // Regression tests against https://github.com/thanos-io/thanos/issues/1632: From v0.8.0 stores advertise labels. + // If the object storage handled by store gateway has only one sidecar we used to hitting issue. + { + storeType: component.Store, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "l3", Value: "v4"}, + }, + }, + } + }, + }, + // Stores v0.8.1 has compatibility labels. Check if they are correctly removed. + { + storeType: component.Store, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "l3", Value: "v4"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: store.CompatibilityTypeLabelName, Value: "store"}, + }, + }, + } + }, + }, + // Duplicated store, in previous versions it would be deduplicated. Now it should be not. + { + storeType: component.Store, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "l1", Value: "v2"}, + {Name: "l2", Value: "v3"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "l3", Value: "v4"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: store.CompatibilityTypeLabelName, Value: "store"}, + }, + }, + } + }, + }, + }) testutil.Ok(t, err) - defer st.Close() + defer stores2.Close() - initialStoreAddr := st.StoreAddresses() - st.CloseOne(initialStoreAddr[0]) + discoveredStoreAddr = append(discoveredStoreAddr, stores2.StoreAddresses()...) - storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts, time.Minute) - storeSet.gRPCInfoCallTimeout = 2 * time.Second - defer storeSet.Close() - - // Should not matter how many of these we run. - storeSet.Update(context.Background()) + // New stores should be loaded. storeSet.Update(context.Background()) + testutil.Equals(t, 1+len(stores2.srvs), len(storeSet.stores)) - testutil.Assert(t, len(storeSet.stores) == 1, "only one service should respond just fine, so we expect one client to be ready.") + // Check stats. + expected = newStoreAPIStats() + expected[component.StoreAPI(nil)] = map[string]int{ + "{l1=\"no-store-type\",l2=\"v3\"}": 1, + } + expected[component.Query] = map[string]int{ + "{l1=\"v2\",l2=\"v3\"}": 1, + "{l1=\"v2\",l2=\"v3\"},{l3=\"v4\"}": 2, + } + expected[component.Rule] = map[string]int{ + "{l1=\"v2\",l2=\"v3\"}": 2, + } + expected[component.Sidecar] = map[string]int{ + fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredStoreAddr[1]): 1, + "{l1=\"v2\",l2=\"v3\"}": 2, + } + expected[component.Store] = map[string]int{ + "": 2, + "{l1=\"v2\",l2=\"v3\"},{l3=\"v4\"}": 3, + } + testutil.Equals(t, expected, storeSet.storesMetric.storeNodes) - addr := initialStoreAddr[1] - store, ok := storeSet.stores[addr] - testutil.Assert(t, ok, "addr exist") - testutil.Equals(t, addr, store.addr) - testutil.Equals(t, 1, len(store.labelSets)) - testutil.Equals(t, "addr", store.labelSets[0].Labels[0].Name) - testutil.Equals(t, addr, store.labelSets[0].Labels[0].Value) + // Check statuses. + testutil.Equals(t, 2+len(stores2.srvs), len(storeSet.storeStatuses)) } -func TestStoreSet_StaticStores_NoneAvailable(t *testing.T) { +func TestStoreSet_Update_NoneAvailable(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() - st, err := newTestStores(2, emptyStoresExtLabels, emptyStoresTypes) + st, err := startTestStores([]testStoreMeta{ + { + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + { + Name: "addr", + Value: addr, + }, + }, + }, + } + }, + storeType: component.Sidecar, + }, + { + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + { + Name: "addr", + Value: addr, + }, + }, + }, + } + }, + storeType: component.Sidecar, + }, + }) testutil.Ok(t, err) defer st.Close() @@ -239,7 +518,12 @@ func TestStoreSet_StaticStores_NoneAvailable(t *testing.T) { st.CloseOne(initialStoreAddr[0]) st.CloseOne(initialStoreAddr[1]) - storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts, time.Minute) + storeSet := NewStoreSet(nil, nil, func() (specs []StoreSpec) { + for _, addr := range initialStoreAddr { + specs = append(specs, NewGRPCStoreSpec(addr)) + } + return specs + }, testGRPCOpts, time.Minute) storeSet.gRPCInfoCallTimeout = 2 * time.Second // Should not matter how many of these we run. @@ -249,80 +533,6 @@ func TestStoreSet_StaticStores_NoneAvailable(t *testing.T) { // Leak test will ensure that we don't keep client connection around. -} - -func TestStoreSet_AllAvailable_BlockExtLsetDuplicates(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() - - storeExtLabels := [][]storepb.Label{ - { - {Name: "l1", Value: "v1"}, - }, - { - {Name: "l1", Value: "v2"}, - {Name: "l2", Value: "v3"}, - }, - { - // Duplicate with above. - {Name: "l1", Value: "v2"}, - {Name: "l2", Value: "v3"}, - }, - // Two store nodes, they don't have ext labels set. - nil, - nil, - { - // Duplicate with two others. - {Name: "l1", Value: "v2"}, - {Name: "l2", Value: "v3"}, - }, - } - - storeTypes := []component.StoreAPI{ - component.Query, - component.Sidecar, - component.Query, - component.Store, - component.Store, - component.Sidecar, - } - - st, err := newTestStores(6, storeExtLabels, storeTypes) - testutil.Ok(t, err) - defer st.Close() - - logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) - logger = level.NewFilter(logger, level.AllowDebug()) - logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) - storeSet := NewStoreSet(logger, nil, specsFromAddrFunc(st.StoreAddresses()), testGRPCOpts, time.Minute) - storeSet.gRPCInfoCallTimeout = 2 * time.Second - defer storeSet.Close() - - // Should not matter how many of these we run. - storeSet.Update(context.Background()) - storeSet.Update(context.Background()) - storeSet.Update(context.Background()) - storeSet.Update(context.Background()) - - testutil.Assert(t, len(storeSet.stores) == 4, fmt.Sprintf("all services should respond just fine, but we expect duplicates being blocked. Expected %d stores, got %d", 4, len(storeSet.stores))) - - // Sort result to be able to compare. - var existingStoreLabels [][]storepb.Label - for _, store := range storeSet.stores { - for _, ls := range store.LabelSets() { - existingStoreLabels = append(existingStoreLabels, ls.Labels) - } - } - sort.Slice(existingStoreLabels, func(i, j int) bool { - return len(existingStoreLabels[i]) > len(existingStoreLabels[j]) - }) - - testutil.Equals(t, [][]storepb.Label{ - { - {Name: "l1", Value: "v2"}, - {Name: "l2", Value: "v3"}, - }, - { - {Name: "l1", Value: "v1"}, - }, - }, existingStoreLabels) + expected := newStoreAPIStats() + testutil.Equals(t, expected, storeSet.storesMetric.storeNodes) } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index a1f1a7d395..d5cdf655de 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -46,15 +46,29 @@ import ( "google.golang.org/grpc/status" ) -// maxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed -// for precalculating the number of samples that we may have to retrieve and decode for any given query -// without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know -// where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way -// because you barely get any improvements in compression when the number of samples is beyond this. -// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf. -const maxSamplesPerChunk = 120 - -const maxChunkSize = 16000 +const ( + // maxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed + // for precalculating the number of samples that we may have to retrieve and decode for any given query + // without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know + // where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way + // because you barely get any improvements in compression when the number of samples is beyond this. + // Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf. + maxSamplesPerChunk = 120 + + maxChunkSize = 16000 + + // CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility + // with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels. + // Now with newer Store Gateway advertising all the external labels it has access to, there was simple case where + // Querier was blocking Store Gateway as duplicate with sidecar. + // + // Newer Queriers are not strict, no duplicated external labels check is there anymore. + // Additionally newer Queriers removes/ignore this exact labels from UI and querying. + // + // This label name is intentionally against Prometheus label style. + // TODO(bwplotka): Remove it at some point. + CompatibilityTypeLabelName = "@thanos_compatibility_store_type" +) type bucketStoreMetrics struct { blocksLoaded prometheus.Gauge @@ -222,7 +236,8 @@ type BucketStore struct { filterConfig *FilterConfig relabelConfig []*relabel.Config - labelSets map[uint64]labels.Labels + labelSets map[uint64]labels.Labels + enableCompatibilityLabel bool } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -240,6 +255,7 @@ func NewBucketStore( blockSyncConcurrency int, filterConf *FilterConfig, relabelConfig []*relabel.Config, + enableCompatibilityLabel bool, ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -271,10 +287,11 @@ func NewBucketStore( maxConcurrent, extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), ), - samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), - partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, - filterConfig: filterConf, - relabelConfig: relabelConfig, + samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), + partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, + filterConfig: filterConf, + relabelConfig: relabelConfig, + enableCompatibilityLabel: enableCompatibilityLabel, } s.metrics = metrics @@ -554,7 +571,7 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info s.mtx.RLock() res.LabelSets = make([]storepb.LabelSet, 0, len(s.labelSets)) for _, ls := range s.labelSets { - lset := []storepb.Label{} + lset := make([]storepb.Label, 0, len(ls)) for _, l := range ls { lset = append(lset, storepb.Label{Name: l.Name, Value: l.Value}) } @@ -562,6 +579,11 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info } s.mtx.RUnlock() + if s.enableCompatibilityLabel && len(res.LabelSets) > 0 { + // This is for compatibility with Querier v0.7.0. + // See query.StoreCompatibilityTypeLabelName comment for details. + res.LabelSets = append(res.LabelSets, storepb.LabelSet{Labels: []storepb.Label{{Name: CompatibilityTypeLabelName, Value: "store"}}}) + } return res, nil } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 433a549867..252f3e5b93 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -151,7 +151,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m maxTime: maxTime, } - store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, filterConf, relabelConfig) + store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, filterConf, relabelConfig, true) testutil.Ok(t, err) s.store = store @@ -485,7 +485,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { &FilterConfig{ MinTime: minTimeDuration, MaxTime: filterMaxTime, - }, emptyRelabelConfig) + }, emptyRelabelConfig, true) testutil.Ok(t, err) err = store.SyncBlocks(ctx) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 2c984af976..c285df4ed4 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -430,7 +430,21 @@ func TestBucketStore_Info(t *testing.T) { dir, err := ioutil.TempDir("", "bucketstore-test") testutil.Ok(t, err) - bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, emptyRelabelConfig) + bucketStore, err := NewBucketStore( + nil, + nil, + nil, + dir, + noopCache{}, + 2e5, + 0, + 0, + false, + 20, + filterConf, + emptyRelabelConfig, + true, + ) testutil.Ok(t, err) resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) @@ -439,6 +453,8 @@ func TestBucketStore_Info(t *testing.T) { testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType) testutil.Equals(t, int64(math.MaxInt64), resp.MinTime) testutil.Equals(t, int64(math.MinInt64), resp.MaxTime) + testutil.Equals(t, []storepb.LabelSet{}, resp.LabelSets) + testutil.Equals(t, []storepb.Label(nil), resp.Labels) } func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { @@ -488,7 +504,7 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { &FilterConfig{ MinTime: minTimeDuration, MaxTime: hourBefore, - }, emptyRelabelConfig) + }, emptyRelabelConfig, true) testutil.Ok(t, err) inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1) @@ -558,7 +574,7 @@ func TestBucketStore_selectorBlocks(t *testing.T) { testutil.Ok(t, err) bucketStore, err := NewBucketStore(nil, nil, bkt, dir, noopCache{}, 0, 0, 20, false, 20, - filterConf, relabelConf) + filterConf, relabelConf, true) testutil.Ok(t, err) for _, id := range []ulid.ULID{id1, id2, id3} { @@ -614,7 +630,21 @@ func TestBucketStore_InfoWithLabels(t *testing.T) { var relabelConfig []*relabel.Config err = yaml.Unmarshal([]byte(relabelContentYaml), &relabelConfig) testutil.Ok(t, err) - bucketStore, err := NewBucketStore(nil, nil, bkt, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, relabelConfig) + bucketStore, err := NewBucketStore( + nil, + nil, + bkt, + dir, + noopCache{}, + 2e5, + 0, + 0, + false, + 20, + filterConf, + relabelConfig, + true, + ) testutil.Ok(t, err) err = bucketStore.SyncBlocks(ctx) @@ -626,13 +656,16 @@ func TestBucketStore_InfoWithLabels(t *testing.T) { testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType) testutil.Equals(t, int64(0), resp.MinTime) testutil.Equals(t, int64(1000), resp.MaxTime) + testutil.Equals(t, []storepb.Label(nil), resp.Labels) testutil.Equals(t, []storepb.LabelSet{ - storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "B"}, + }, + }, + { Labels: []storepb.Label{ - { - Name: "cluster", - Value: "B", - }, + {Name: CompatibilityTypeLabelName, Value: "store"}, }, }, }, resp.LabelSets)