Skip to content

Commit

Permalink
*: Allow exposing multiple label-sets
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Jun 28, 2019
1 parent 2431318 commit e900615
Show file tree
Hide file tree
Showing 16 changed files with 1,221 additions and 562 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ require (
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible
go.uber.org/atomic v1.4.0 // indirect
golang.org/x/net v0.0.0-20190522155817-f3200d17e092
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2
Expand Down
97 changes: 53 additions & 44 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -32,14 +33,14 @@ type StoreSpec interface {
// If metadata call fails we assume that store is no longer accessible and we should not use it.
// NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage
// given store connection.
Metadata(ctx context.Context, client storepb.StoreClient) (labels []storepb.Label, mint int64, maxt int64, err error)
Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, err error)
}

type StoreStatus struct {
Name string
LastCheck time.Time
LastError error
Labels []storepb.Label
LabelSets []storepb.LabelSet
StoreType component.StoreAPI
MinTime int64
MaxTime int64
Expand All @@ -62,12 +63,12 @@ func (s *grpcStoreSpec) Addr() string {

// Metadata method for gRPC store API tries to reach host Info method until context timeout. If we are unable to get metadata after
// that time, we assume that the host is unhealthy and return error.
func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labels []storepb.Label, mint int64, maxt int64, err error) {
func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, err error) {
resp, err := client.Info(ctx, &storepb.InfoRequest{}, grpc.FailFast(false))
if err != nil {
return nil, 0, 0, errors.Wrapf(err, "fetching store info from %s", s.addr)
}
return resp.Labels, resp.MinTime, resp.MaxTime, nil
return resp.LabelSets, resp.MinTime, resp.MaxTime, nil
}

// StoreSet maintains a set of active stores. It is backed up by Store Specifications that are dynamically fetched on
Expand All @@ -81,13 +82,13 @@ type StoreSet struct {
dialOpts []grpc.DialOption
gRPCInfoCallTimeout time.Duration

mtx sync.RWMutex
storesStatusesMtx sync.RWMutex
stores map[string]*storeRef
storeNodeConnections prometheus.Gauge
externalLabelStores map[string]int
storeStatuses map[string]*StoreStatus
unhealthyStoreTimeout time.Duration
mtx sync.RWMutex
storesStatusesMtx sync.RWMutex
stores map[string]*storeRef
storeNodeConnections prometheus.Gauge
externalLabelOccurrencesInStores map[string]int
storeStatuses map[string]*StoreStatus
unhealthyStoreTimeout time.Duration
}

type storeSetNodeCollector struct {
Expand Down Expand Up @@ -135,12 +136,12 @@ func NewStoreSet(
}

ss := &StoreSet{
logger: log.With(logger, "component", "storeset"),
storeSpecs: storeSpecs,
dialOpts: dialOpts,
storeNodeConnections: storeNodeConnections,
gRPCInfoCallTimeout: 10 * time.Second,
externalLabelStores: map[string]int{},
logger: log.With(logger, "component", "storeset"),
storeSpecs: storeSpecs,
dialOpts: dialOpts,
storeNodeConnections: storeNodeConnections,
gRPCInfoCallTimeout: 10 * time.Second,
externalLabelOccurrencesInStores: map[string]int{},
stores: make(map[string]*storeRef),
storeStatuses: make(map[string]*StoreStatus),
unhealthyStoreTimeout: unhealthyStoreTimeout,
Expand All @@ -162,27 +163,27 @@ type storeRef struct {
addr string

// Meta (can change during runtime).
labels []storepb.Label
labelSets []storepb.LabelSet
storeType component.StoreAPI
minTime int64
maxTime int64

logger log.Logger
}

func (s *storeRef) Update(labels []storepb.Label, minTime int64, maxTime int64) {
s.mtx.RLock()
defer s.mtx.RUnlock()
func (s *storeRef) Update(labelSets []storepb.LabelSet, minTime int64, maxTime int64) {
s.mtx.Lock()
defer s.mtx.Unlock()

s.labels = labels
s.labelSets = labelSets
s.minTime = minTime
s.maxTime = maxTime
}

func (s *storeRef) Labels() []storepb.Label {
func (s *storeRef) LabelSets() []storepb.LabelSet {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.labels
return s.labelSets
}

func (s *storeRef) TimeRange() (int64, int64) {
Expand All @@ -194,7 +195,7 @@ func (s *storeRef) TimeRange() (int64, int64) {

func (s *storeRef) String() string {
mint, maxt := s.TimeRange()
return fmt.Sprintf("Addr: %s Labels: %v Mint: %d Maxt: %d", s.addr, s.Labels(), mint, maxt)
return fmt.Sprintf("Addr: %s LabelSets: %v Mint: %d Maxt: %d", s.addr, storepb.LabelSetsToString(s.LabelSets()), mint, maxt)
}

func (s *storeRef) Addr() string {
Expand All @@ -211,10 +212,11 @@ func (s *StoreSet) Update(ctx context.Context) {
healthyStores := s.getHealthyStores(ctx)

// Record the number of occurrences of external label combinations for current store slice.
externalLabelStores := map[string]int{}
externalLabelOccurrencesInStores := map[string]int{}
for _, st := range healthyStores {
externalLabelStores[externalLabelsFromStore(st)]++
externalLabelOccurrencesInStores[externalLabelsFromStore(st)]++
}
level.Debug(s.logger).Log("msg", "updating healthy stores", "externalLabelOccurrencesInStores", fmt.Sprintf("%#+v", externalLabelOccurrencesInStores))

s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -238,10 +240,12 @@ func (s *StoreSet) Update(ctx context.Context) {
// No external labels means strictly store gateway or ruler and it is fine to have access to multiple instances of them.
//
// Sidecar will error out if it will be configured with empty external labels.
if len(store.Labels()) > 0 && externalLabelStores[externalLabelsFromStore(store)] != 1 {
externalLabels := externalLabelsFromStore(store)
storesWithExternalLabels := externalLabelOccurrencesInStores[externalLabels]
if len(store.LabelSets()) > 0 && storesWithExternalLabels != 1 {
store.close()
s.updateStoreStatus(store, errors.New(droppingStoreMessage))
level.Warn(s.logger).Log("msg", droppingStoreMessage, "address", addr)
level.Warn(s.logger).Log("msg", droppingStoreMessage, "address", addr, "extLset", externalLabels, "duplicates", storesWithExternalLabels)
continue
}

Expand All @@ -254,7 +258,7 @@ func (s *StoreSet) Update(ctx context.Context) {
s.updateStoreStatus(store, nil)
level.Info(s.logger).Log("msg", "adding new store to query storeset", "address", addr)
}
s.externalLabelStores = externalLabelStores
s.externalLabelOccurrencesInStores = externalLabelOccurrencesInStores
s.storeNodeConnections.Set(float64(len(s.stores)))
s.cleanUpStoreStatuses()
}
Expand Down Expand Up @@ -288,14 +292,14 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef {
store, ok := s.stores[addr]
if ok {
// Check existing store. Is it healthy? What are current metadata?
labels, minTime, maxTime, err := spec.Metadata(ctx, store.StoreClient)
labelSets, minTime, maxTime, err := spec.Metadata(ctx, store.StoreClient)
if err != nil {
// Peer unhealthy. Do not include in healthy stores.
s.updateStoreStatus(store, err)
level.Warn(s.logger).Log("msg", "update of store node failed", "err", err, "address", addr)
return
}
store.Update(labels, minTime, maxTime)
store.Update(labelSets, minTime, maxTime)
} else {
// New store or was unhealthy and was removed in the past - create new one.
conn, err := grpc.DialContext(ctx, addr, s.dialOpts...)
Expand All @@ -315,7 +319,7 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef {
return
}
store.storeType = component.FromProto(resp.StoreType)
store.Update(resp.Labels, resp.MinTime, resp.MaxTime)
store.Update(resp.LabelSets, resp.MinTime, resp.MaxTime)
}

mtx.Lock()
Expand All @@ -331,16 +335,21 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef {
}

func externalLabelsFromStore(store *storeRef) string {
tsdbLabels := labels.Labels{}
for _, l := range store.labels {
tsdbLabels = append(tsdbLabels, labels.Label{
Name: l.Name,
Value: l.Value,
})
tsdbLabelSetStrings := make([]string, 0, len(store.labelSets))
for _, ls := range store.labelSets {
tsdbLabels := labels.Labels{}
for _, l := range ls.Labels {
tsdbLabels = append(tsdbLabels, labels.Label{
Name: l.Name,
Value: l.Value,
})
}
sort.Sort(tsdbLabels)
tsdbLabelSetStrings = append(tsdbLabelSetStrings, tsdbLabels.String())
}
sort.Sort(tsdbLabels)
sort.Strings(tsdbLabelSetStrings)

return tsdbLabels.String()
return strings.Join(tsdbLabelSetStrings, ",")
}

func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
Expand All @@ -357,7 +366,7 @@ func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
status.LastCheck = time.Now()

if err == nil {
status.Labels = store.labels
status.LabelSets = store.labelSets
status.StoreType = store.storeType
status.MinTime = store.minTime
status.MaxTime = store.maxTime
Expand Down Expand Up @@ -385,8 +394,8 @@ func (s *StoreSet) externalLabelOccurrences() map[string]int {
s.mtx.RLock()
defer s.mtx.RUnlock()

r := make(map[string]int, len(s.externalLabelStores))
for k, v := range s.externalLabelStores {
r := make(map[string]int, len(s.externalLabelOccurrencesInStores))
for k, v := range s.externalLabelOccurrencesInStores {
r[k] = v
}

Expand Down
59 changes: 38 additions & 21 deletions pkg/query/storeset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package query

import (
"context"
"fmt"
"math"
"net"
"os"
"testing"
"time"

"sort"

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/testutil"
"google.golang.org/grpc"
Expand Down Expand Up @@ -56,17 +60,23 @@ func newTestStores(numStores int, storesExtLabels ...[]storepb.Label) (*testStor
}

for i := 0; i < numStores; i++ {
lsetFn := func(addr string) []storepb.Label {
lsetFn := func(addr string) []storepb.LabelSet {
if len(storesExtLabels) != numStores {
return []storepb.Label{
{
Name: "addr",
Value: addr,
return []storepb.LabelSet{{
Labels: []storepb.Label{
{
Name: "addr",
Value: addr,
},
},
}
}}
}
ls := storesExtLabels[i]
if len(ls) == 0 {
return []storepb.LabelSet{}
}

return storesExtLabels[i]
return []storepb.LabelSet{{Labels: storesExtLabels[i]}}
}

srv, addr, err := startStore(lsetFn)
Expand All @@ -82,14 +92,14 @@ func newTestStores(numStores int, storesExtLabels ...[]storepb.Label) (*testStor
return st, nil
}

func startStore(lsetFn func(addr string) []storepb.Label) (*grpc.Server, string, error) {
func startStore(lsetFn func(addr string) []storepb.LabelSet) (*grpc.Server, string, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, "", err
}

srv := grpc.NewServer()
storepb.RegisterStoreServer(srv, &testStore{info: storepb.InfoResponse{Labels: lsetFn(listener.Addr().String())}})
storepb.RegisterStoreServer(srv, &testStore{info: storepb.InfoResponse{LabelSets: lsetFn(listener.Addr().String())}})
go func() {
_ = srv.Serve(listener)
}()
Expand Down Expand Up @@ -154,9 +164,9 @@ func TestStoreSet_AllAvailable_ThenDown(t *testing.T) {

for addr, store := range storeSet.stores {
testutil.Equals(t, addr, store.addr)
testutil.Equals(t, 1, len(store.labels))
testutil.Equals(t, "addr", store.labels[0].Name)
testutil.Equals(t, addr, store.labels[0].Value)
testutil.Equals(t, 1, len(store.labelSets))
testutil.Equals(t, "addr", store.labelSets[0].Labels[0].Name)
testutil.Equals(t, addr, store.labelSets[0].Labels[0].Value)
}

st.CloseOne(initialStoreAddr[0])
Expand All @@ -170,9 +180,9 @@ func TestStoreSet_AllAvailable_ThenDown(t *testing.T) {
store, ok := storeSet.stores[addr]
testutil.Assert(t, ok, "addr exist")
testutil.Equals(t, addr, store.addr)
testutil.Equals(t, 1, len(store.labels))
testutil.Equals(t, "addr", store.labels[0].Name)
testutil.Equals(t, addr, store.labels[0].Value)
testutil.Equals(t, 1, len(store.labelSets))
testutil.Equals(t, "addr", store.labelSets[0].Labels[0].Name)
testutil.Equals(t, addr, store.labelSets[0].Labels[0].Value)
}

func TestStoreSet_StaticStores_OneAvailable(t *testing.T) {
Expand All @@ -199,9 +209,9 @@ func TestStoreSet_StaticStores_OneAvailable(t *testing.T) {
store, ok := storeSet.stores[addr]
testutil.Assert(t, ok, "addr exist")
testutil.Equals(t, addr, store.addr)
testutil.Equals(t, 1, len(store.labels))
testutil.Equals(t, "addr", store.labels[0].Name)
testutil.Equals(t, addr, store.labels[0].Value)
testutil.Equals(t, 1, len(store.labelSets))
testutil.Equals(t, "addr", store.labelSets[0].Labels[0].Name)
testutil.Equals(t, addr, store.labelSets[0].Labels[0].Value)
}

func TestStoreSet_StaticStores_NoneAvailable(t *testing.T) {
Expand Down Expand Up @@ -259,7 +269,10 @@ func TestStoreSet_AllAvailable_BlockExtLsetDuplicates(t *testing.T) {

initialStoreAddr := st.StoreAddresses()

storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts, time.Minute)
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = level.NewFilter(logger, level.AllowDebug())
logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
storeSet := NewStoreSet(logger, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts, time.Minute)
storeSet.gRPCInfoCallTimeout = 2 * time.Second
defer storeSet.Close()

Expand All @@ -269,13 +282,17 @@ func TestStoreSet_AllAvailable_BlockExtLsetDuplicates(t *testing.T) {
storeSet.Update(context.Background())
storeSet.Update(context.Background())

testutil.Assert(t, len(storeSet.stores) == 5-2, "all services should respond just fine, but we expect duplicates being blocked.")
storeNum := len(storeSet.stores)
expectedStoreNum := 5 - 2
testutil.Assert(t, storeNum == expectedStoreNum, fmt.Sprintf("all services should respond just fine, but we expect duplicates being blocked. Expected %d stores, got %d", expectedStoreNum, storeNum))

// Sort result to be able to compare.

var existingStoreLabels [][]storepb.Label
for _, store := range storeSet.stores {
existingStoreLabels = append(existingStoreLabels, store.Labels())
for _, ls := range store.LabelSets() {
existingStoreLabels = append(existingStoreLabels, ls.Labels)
}
}
sort.Slice(existingStoreLabels, func(i, j int) bool {
return len(existingStoreLabels[i]) > len(existingStoreLabels[j])
Expand Down
9 changes: 9 additions & 0 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ func (p *PrometheusStore) Info(ctx context.Context, r *storepb.InfoRequest) (*st
Value: l.Value,
})
}

// Until we deprecate the single labels in the reply, we just duplicate
// them here for migration/compatibility purposes.
res.LabelSets = []storepb.LabelSet{}
if len(res.Labels) > 0 {
res.LabelSets = append(res.LabelSets, storepb.LabelSet{
Labels: res.Labels,
})
}
return res, nil
}

Expand Down
Loading

0 comments on commit e900615

Please sign in to comment.