Skip to content

Commit

Permalink
Receive: dont rely on slice labels
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Jan 29, 2024
1 parent 6a0a491 commit d700c5e
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 140 deletions.
3 changes: 2 additions & 1 deletion pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)

var (
Expand Down Expand Up @@ -67,7 +68,7 @@ type HashringConfig struct {
Tenants []string `json:"tenants,omitempty"`
Endpoints []Endpoint `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels map[string]string `json:"external_labels,omitempty"`
ExternalLabels labels.Labels `json:"external_labels,omitempty"`
}

// ConfigWatcher is able to watch a file containing a hashring configuration
Expand Down
8 changes: 1 addition & 7 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,13 +650,7 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist
// Test that each time series is stored
// the correct amount of times in each fake DB.
for _, ts := range tc.wreq.Timeseries {
lset := make(labels.Labels, len(ts.Labels))
for j := range ts.Labels {
lset[j] = labels.Label{
Name: ts.Labels[j].Name,
Value: ts.Labels[j].Value,
}
}
lset := labelpb.ZLabelsToPromLabels(ts.Labels)
for j, a := range tc.appendables {
if withConsistencyDelay {
var expected int
Expand Down
86 changes: 11 additions & 75 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,28 @@ import (
"path"
"path/filepath"
"sort"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/exemplars"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -572,12 +569,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
reg = NewUnRegisterer(reg)

initialLset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID))

lset, err := t.extractTenantsLabels(tenantID, initialLset)
if err != nil {
return err
}

lset := t.extractTenantsLabels(tenantID, initialLset)
dataDir := t.defaultTenantDataDir(tenantID)

level.Info(logger).Log("msg", "opening TSDB")
Expand Down Expand Up @@ -686,14 +678,7 @@ func (t *MultiTSDB) SetHashringConfig(cfg []HashringConfig) error {
updatedTenants = append(updatedTenants, tenantID)

lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID))

if hc.ExternalLabels != nil {
extendedLset, err := extendLabels(lset, hc.ExternalLabels, t.logger)
if err != nil {
return errors.Wrap(err, "failed to extend external labels for tenant "+tenantID)
}
lset = extendedLset
}
lset = labelpb.ExtendSortedLabels(hc.ExternalLabels, lset)

if t.tenants[tenantID].ship != nil {
t.tenants[tenantID].ship.SetLabels(lset)
Expand Down Expand Up @@ -865,63 +850,14 @@ func (u *UnRegisterer) MustRegister(cs ...prometheus.Collector) {
// extractTenantsLabels extracts tenant's external labels from hashring configs.
// If one tenant appears in multiple hashring configs,
// only the external label set from the first hashring config is applied.
func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Labels) (labels.Labels, error) {
func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Labels) labels.Labels {
for _, hc := range t.hashringConfigs {
for _, tenant := range hc.Tenants {
if tenant != tenantID {
continue
}

if hc.ExternalLabels != nil {
extendedLset, err := extendLabels(initialLset, hc.ExternalLabels, t.logger)
if err != nil {
return nil, errors.Wrap(err, "failed to extend external labels for tenant "+tenantID)
}
return extendedLset, nil
}

return initialLset, nil
return labelpb.ExtendSortedLabels(hc.ExternalLabels, initialLset)
}
}

return initialLset, nil
}

// extendLabels extends external labels of the initial label set.
// If an external label shares same name with a label in the initial label set,
// use the label in the initial label set and inform user about it.
func extendLabels(labelSet labels.Labels, extend map[string]string, logger log.Logger) (labels.Labels, error) {
var extendLabels labels.Labels
for name, value := range extend {
if !model.LabelName.IsValid(model.LabelName(name)) {
return nil, errors.Errorf("unsupported format for label's name: %s", name)
}
extendLabels = append(extendLabels, labels.Label{Name: name, Value: value})
}

sort.Sort(labelSet)
sort.Sort(extendLabels)

extendedLabelSet := make(labels.Labels, 0, len(labelSet)+len(extendLabels))
for len(labelSet) > 0 && len(extendLabels) > 0 {
d := strings.Compare(labelSet[0].Name, extendLabels[0].Name)
if d == 0 {
extendedLabelSet = append(extendedLabelSet, labelSet[0])
level.Info(logger).Log("msg", "Duplicate label found. Using initial label instead.",
"label's name", extendLabels[0].Name)
labelSet, extendLabels = labelSet[1:], extendLabels[1:]
} else if d < 0 {
extendedLabelSet = append(extendedLabelSet, labelSet[0])
labelSet = labelSet[1:]
} else if d > 0 {
extendedLabelSet = append(extendedLabelSet, extendLabels[0])
extendLabels = extendLabels[1:]
}
}
extendedLabelSet = append(extendedLabelSet, labelSet...)
extendedLabelSet = append(extendedLabelSet, extendLabels...)

sort.Sort(extendedLabelSet)

return extendedLabelSet, nil
return initialLset
}
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func queryLabelValues(ctx context.Context, m *MultiTSDB) error {
clients[0] = &slowClient{clients[0]}
}
return clients
}, component.Store, nil, 1*time.Minute, store.LazyRetrieval)
}, component.Store, labels.EmptyLabels(), 1*time.Minute, store.LazyRetrieval)

req := &storepb.LabelValuesRequest{
Label: labels.MetricName,
Expand Down
Loading

0 comments on commit d700c5e

Please sign in to comment.