Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: add support for globbing tenant specifiers #7155

Merged
merged 1 commit into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,39 @@ The example content of `hashring.json`:

With such configuration any receive listens for remote write on `<ip>10908/api/v1/receive` and will forward to correct one in hashring if needed for tenancy and replication.

It is possible to only match certain `tenant`s inside of a hashring file. For example:

```json
[
{
"tenants": ["foobar"],
"endpoints": [
"127.0.0.1:1234",
"127.0.0.1:12345",
"127.0.0.1:1235"
]
}
]
```

The specified endpoints will be used if the tenant is set to `foobar`. It is possible to use glob matching through the parameter `tenant_matcher_type`. It can have the value `glob`. In this case, the strings inside the array are taken as glob patterns and matched against the `tenant` inside of a remote-write request. For instance:

```json
[
{
"tenants": ["foo*"],
"tenant_matcher_type": "glob",
"endpoints": [
"127.0.0.1:1234",
"127.0.0.1:12345",
"127.0.0.1:1235"
]
}
]
```

This will still match the tenant `foobar` and any other tenant which begins with the letters `foo`.

### AZ-aware Ketama hashring (experimental)

In order to ensure even spread for replication over nodes in different availability-zones, you can choose to include az definition in your hashring config. If we for example have a 6 node cluster, spread over 3 different availability zones; A, B and C, we could use the following example `hashring.json`:
Expand Down
4 changes: 4 additions & 0 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ Flags:
--query.default-step=1s Default range query step to use. This is
only used in stateless Ruler and alert state
restoration.
--query.enable-x-functions
Whether to enable extended rate functions
(xrate, xincrease and xdelta). Only has effect
when used with Thanos engine.
--query.http-method=POST HTTP method to use when sending queries.
Possible options: [GET, POST]
--query.sd-dns-interval=30s
Expand Down
24 changes: 19 additions & 5 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,25 @@ func (e *Endpoint) UnmarshalJSON(data []byte) error {
// HashringConfig represents the configuration for a hashring
// a receive node knows about.
type HashringConfig struct {
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Endpoints []Endpoint `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels labels.Labels `json:"external_labels,omitempty"`
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
TenantMatcherType tenantMatcher `json:"tenant_matcher_type,omitempty"`
Endpoints []Endpoint `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels labels.Labels `json:"external_labels,omitempty"`
}

type tenantMatcher string

const (
// TenantMatcherTypeExact matches tenants exactly. This is also the default one.
TenantMatcherTypeExact tenantMatcher = "exact"
// TenantMatcherGlob matches tenants using glob patterns.
TenantMatcherGlob tenantMatcher = "glob"
)

func isExactMatcher(m tenantMatcher) bool {
return m == TenantMatcherTypeExact || m == ""
}

// ConfigWatcher is able to watch a file containing a hashring configuration
Expand Down
35 changes: 29 additions & 6 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package receive
import (
"fmt"
"math"
"path/filepath"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -249,7 +250,7 @@ func (c ketamaHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
type multiHashring struct {
cache map[string]Hashring
hashrings []Hashring
tenantSets []map[string]struct{}
tenantSets []map[string]tenantMatcher

// We need a mutex to guard concurrent access
// to the cache map, as this is both written to
Expand All @@ -273,15 +274,37 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
return h.GetN(tenant, ts, n)
}
var found bool

// If the tenant is not in the cache, then we need to check
// every tenant in the configuration.
for i, t := range m.tenantSets {
// If the hashring has no tenants, then it is
// considered a default hashring and matches everything.
if t == nil {
found = true
} else if _, ok := t[tenant]; ok {
found = true
} else {
// Fast path for the common case of direct match.
if mt, ok := t[tenant]; ok && isExactMatcher(mt) {
found = true
} else {
for tenantPattern, matcherType := range t {
switch matcherType {
case TenantMatcherGlob:
matches, err := filepath.Match(tenantPattern, tenant)
if err != nil {
return "", fmt.Errorf("error matching tenant pattern %s (tenant %s): %w", tenantPattern, tenant, err)
}
found = matches
case TenantMatcherTypeExact:
// Already checked above, skipping.
fallthrough
default:
continue
}

}
}

}
if found {
m.mu.Lock()
Expand Down Expand Up @@ -320,12 +343,12 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
}
m.nodes = append(m.nodes, hashring.Nodes()...)
m.hashrings = append(m.hashrings, hashring)
var t map[string]struct{}
var t map[string]tenantMatcher
if len(h.Tenants) != 0 {
t = make(map[string]struct{})
t = make(map[string]tenantMatcher)
}
for _, tenant := range h.Tenants {
t[tenant] = struct{}{}
t[tenant] = h.TenantMatcherType
}
m.tenantSets = append(m.tenantSets, t)
}
Expand Down
50 changes: 50 additions & 0 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@ import (
"log"
"net/http"
"net/http/httputil"
"os"
"testing"
"time"

"github.com/efficientgo/core/backoff"
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
e2emon "github.com/efficientgo/e2e/monitoring"
"github.com/efficientgo/e2e/monitoring/matchers"
logkit "github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/prompb"

"github.com/stretchr/testify/require"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/receive"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)

Expand Down Expand Up @@ -968,3 +975,46 @@ test_metric{a="2", b="2"} 1`)
})
})
}

func TestReceiveGlob(t *testing.T) {
e, err := e2e.NewDockerEnvironment("receive-glob")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

h := receive.HashringConfig{
TenantMatcherType: "glob",
Tenants: []string{
"default*",
},
Endpoints: []receive.Endpoint{
{Address: i.InternalEndpoint("grpc")},
},
}

r := e2ethanos.NewReceiveBuilder(e, "router").WithRouting(1, h).Init()
testutil.Ok(t, e2e.StartAndWaitReady(r))

q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

require.NoError(t, runutil.RetryWithLog(logkit.NewLogfmtLogger(os.Stdout), 1*time.Second, make(<-chan struct{}), func() error {
return storeWriteRequest(context.Background(), "http://"+r.Endpoint("remote-write")+"/api/v1/receive", &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "aa", Value: "bb"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: time.Now().UnixMilli()},
},
},
},
})
}))

testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "default-tenant")), e2emon.WaitMissingMetrics()))

}
Loading