Skip to content

Commit

Permalink
Add ability to consume content of the hashring directly
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun committed Sep 3, 2020
1 parent 3ce1da8 commit cfb0bda
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 73 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

- [#3121](https://github.com/thanos-io/thanos/pull/3121) Receive: Added `--receive.hashrings` alternative to `receive.hashrings-file` flag (lower priority). Content of JSON file that contains the hashring configuration.

## [v0.15.0](https://github.com/thanos-io/thanos/releases) - in release process.

:warning: **WARNING** :warning: Thanos Rule's `/api/v1/rules` endpoint no longer returns the old, deprecated `partial_response_strategy`. The old, deprecated value has been fixed to `WARN` for quite some time. _Please_ use `partialResponseStrategy`.
Expand All @@ -24,7 +26,6 @@ sse_config:
type: SSE-S3
```
### Fixed
- [#2937](https://github.com/thanos-io/thanos/pull/2937) Receive: Fixing auto-configuration of --receive.local-endpoint
Expand Down
49 changes: 34 additions & 15 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/extkingpin"

"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -63,8 +65,7 @@ func registerReceive(app *extkingpin.App) {

retention := modelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention.").Default("15d"))

hashringsFile := cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration.").
PlaceHolder("<path>").String()
hashringsFile := extflag.RegisterPathOrContent(cmd, "receive.hashrings", "JSON file that contains the hashring configuration.", false)

refreshInterval := modelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))
Expand Down Expand Up @@ -101,14 +102,6 @@ func registerReceive(app *extkingpin.App) {
return errors.Wrap(err, "parse labels")
}

var cw *receive.ConfigWatcher
if *hashringsFile != "" {
cw, err = receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, *hashringsFile, *refreshInterval)
if err != nil {
return err
}
}

tsdbOpts := &tsdb.Options{
MinBlockDuration: int64(time.Duration(*tsdbMinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(*tsdbMaxBlockDuration) / time.Millisecond),
Expand Down Expand Up @@ -154,7 +147,8 @@ func registerReceive(app *extkingpin.App) {
tsdbOpts,
*ignoreBlockSize,
lset,
cw,
hashringsFile,
refreshInterval,
*localEndpoint,
*tenantHeader,
*defaultTenantID,
Expand Down Expand Up @@ -193,7 +187,8 @@ func runReceive(
tsdbOpts *tsdb.Options,
ignoreBlockSize bool,
lset labels.Labels,
cw *receive.ConfigWatcher,
hashringsFile *extflag.PathOrContent,
refreshInterval *model.Duration,
endpoint string,
tenantHeader string,
defaultTenantID string,
Expand Down Expand Up @@ -369,7 +364,13 @@ func runReceive(
// watcher, we close the chan ourselves.
updates := make(chan receive.Hashring, 1)

if cw != nil {
// The Hashrings config file path given initializing config watcher.
if configPath, err := hashringsFile.Path(); err != nil {
cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, configPath, *refreshInterval)
if err != nil {
return errors.Wrap(err, "failed to initialize config watcher")
}

// Check the hashring configuration on before running the watcher.
if err := cw.ValidateConfig(); err != nil {
cw.Stop()
Expand All @@ -379,15 +380,33 @@ func runReceive(

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return receive.HashringFromConfig(ctx, updates, cw)
return receive.HashringFromConfigWatcher(ctx, updates, cw)
}, func(error) {
cancel()
})
} else {
configContent, err := hashringsFile.Content()
if err != nil {
return errors.Wrap(err, "failed to read hashrings configuration file")
}

var ring receive.Hashring
// The Hashrings config file content given initialize configuration from content.
if len(configContent) > 0 {
ring, err = receive.HashringFromConfig(configContent)
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
}
} else {
// The hashring file is not specified use single node hashring.
ring = receive.SingleNodeHashring(endpoint)
}

cancel := make(chan struct{})
g.Add(func() error {
defer close(updates)
updates <- receive.SingleNodeHashring(endpoint)
updates <- ring
<-cancel
return nil
}, func(error) {
Expand Down
8 changes: 6 additions & 2 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,13 @@ Flags:
https://thanos.io/tip/thanos/storage.md/#configuration
--tsdb.retention=15d How long to retain raw samples on local
storage. 0d - disables this retention.
--receive.hashrings-file=<path>
Path to file that contains the hashring
--receive.hashrings-file=<file-path>
Path to JSON file that contains the hashring
configuration.
--receive.hashrings=<content>
Alternative to 'receive.hashrings-file' flag
(lower priority). Content of JSON file that
contains the hashring configuration.
--receive.hashrings-file-refresh-interval=5m
Refresh interval to re-read the hashring
configuration file. (used as a fallback)
Expand Down
25 changes: 20 additions & 5 deletions pkg/extflag/pathorcontent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,45 @@ func RegisterPathOrContent(cmd FlagClause, flagName string, help string, require
}
}

// Content returns content of the file. Flag that specifies path has priority.
// Content returns the content of the file. Flag that specifies path has priority.
// It returns error if the content is empty and required flag is set to true.
func (p *PathOrContent) Content() ([]byte, error) {
contentFlagName := p.flagName
fileFlagName := fmt.Sprintf("%s-file", p.flagName)

if len(*p.path) > 0 && len(*p.content) > 0 {
return nil, errors.Errorf("both %s and %s flags set.", fileFlagName, contentFlagName)
return nil, errors.Errorf("both %s and %s flags set.", fileFlagName, p.flagName)
}

var content []byte
if len(*p.path) > 0 {
c, err := ioutil.ReadFile(*p.path)
if err != nil {
return nil, errors.Wrapf(err, "loading YAML file %s for %s", *p.path, fileFlagName)
return nil, errors.Wrapf(err, "loading file %s for %s", *p.path, fileFlagName)
}
content = c
} else {
content = []byte(*p.content)
}

if len(content) == 0 && p.required {
return nil, errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", fileFlagName, contentFlagName)
return nil, errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", fileFlagName, p.flagName)
}

return content, nil
}

// Path returns the path of the file. Flag that specifies path has priority.
// It returns error if the required flag is set to true.
func (p *PathOrContent) Path() (string, error) {
fileFlagName := fmt.Sprintf("%s-file", p.flagName)

if len(*p.path) > 0 && len(*p.content) > 0 {
return "", errors.Errorf("both %s and %s flags set.", fileFlagName, p.flagName)
}

if len(*p.path) == 0 && p.required {
return "", errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", fileFlagName, p.flagName)
}

return *p.path, nil
}
82 changes: 41 additions & 41 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,35 +176,42 @@ func (cw *ConfigWatcher) C() <-chan []HashringConfig {

// ValidateConfig returns an error if the configuration that's being watched is not valid.
func (cw *ConfigWatcher) ValidateConfig() error {
_, _, err := cw.loadConfig()
_, _, err := loadConfig(cw.logger, cw.path)
return err
}

// loadConfig loads raw configuration content and returns a configuration.
func (cw *ConfigWatcher) loadConfig() ([]HashringConfig, float64, error) {
cfgContent, err := cw.readFile()
if err != nil {
return nil, 0, errors.Wrap(err, "failed to read configuration file")
}
// Stop shuts down the config watcher.
func (cw *ConfigWatcher) Stop() {
level.Debug(cw.logger).Log("msg", "stopping hashring configuration watcher...", "path", cw.path)

config, err := cw.parseConfig(cfgContent)
if err != nil {
return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err)
}
done := make(chan struct{})
defer close(done)

// If hashring is empty, return an error.
if len(config) == 0 {
return nil, 0, errors.Wrapf(errEmptyConfigurationFile, "failed to load configuration file, path: %s", cw.path)
// Closing the watcher will deadlock unless all events and errors are drained.
go func() {
for {
select {
case <-cw.watcher.Errors:
case <-cw.watcher.Events:
// Drain all events and errors.
case <-done:
return
}
}
}()
if err := cw.watcher.Close(); err != nil {
level.Error(cw.logger).Log("msg", "error closing file watcher", "path", cw.path, "err", err)
}

return config, hashAsMetricValue(cfgContent), nil
close(cw.ch)
level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped")
}

// refresh reads the configured file and sends the hashring configuration on the channel.
func (cw *ConfigWatcher) refresh(ctx context.Context) {
cw.refreshCounter.Inc()

config, cfgHash, err := cw.loadConfig()
config, cfgHash, err := loadConfig(cw.logger, cw.path)
if err != nil {
cw.errorCounter.Inc()
level.Error(cw.logger).Log("msg", "failed to load configuration file", "err", err, "path", cw.path)
Expand Down Expand Up @@ -238,50 +245,43 @@ func (cw *ConfigWatcher) refresh(ctx context.Context) {
}
}

// Stop shuts down the config watcher.
func (cw *ConfigWatcher) Stop() {
level.Debug(cw.logger).Log("msg", "stopping hashring configuration watcher...", "path", cw.path)
// loadConfig loads raw configuration content and returns a configuration.
func loadConfig(logger log.Logger, path string) ([]HashringConfig, float64, error) {
cfgContent, err := readFile(logger, path)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to read configuration file")
}

done := make(chan struct{})
defer close(done)
config, err := parseConfig(cfgContent)
if err != nil {
return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err)
}

// Closing the watcher will deadlock unless all events and errors are drained.
go func() {
for {
select {
case <-cw.watcher.Errors:
case <-cw.watcher.Events:
// Drain all events and errors.
case <-done:
return
}
}
}()
if err := cw.watcher.Close(); err != nil {
level.Error(cw.logger).Log("msg", "error closing file watcher", "path", cw.path, "err", err)
// If hashring is empty, return an error.
if len(config) == 0 {
return nil, 0, errors.Wrapf(errEmptyConfigurationFile, "failed to load configuration file, path: %s", path)
}

close(cw.ch)
level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped")
return config, hashAsMetricValue(cfgContent), nil
}

// readFile reads the configuration file and returns content of configuration file.
func (cw *ConfigWatcher) readFile() ([]byte, error) {
fd, err := os.Open(cw.path)
func readFile(logger log.Logger, path string) ([]byte, error) {
fd, err := os.Open(path)
if err != nil {
return nil, err
}
defer func() {
if err := fd.Close(); err != nil {
level.Error(cw.logger).Log("msg", "failed to close file", "err", err, "path", cw.path)
level.Error(logger).Log("msg", "failed to close file", "err", err, "path", path)
}
}()

return ioutil.ReadAll(fd)
}

// parseConfig parses the raw configuration content and returns a HashringConfig.
func (cw *ConfigWatcher) parseConfig(content []byte) ([]HashringConfig, error) {
func parseConfig(content []byte) ([]HashringConfig, error) {
var config []HashringConfig
err := json.Unmarshal(content, &config)
return config, err
Expand Down
20 changes: 18 additions & 2 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cespare/xxhash"
"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

Expand Down Expand Up @@ -158,14 +159,14 @@ func newMultiHashring(cfg []HashringConfig) Hashring {
return m
}

// HashringFromConfig creates multi-tenant hashrings from a
// HashringFromConfigWatcher creates multi-tenant hashrings from a
// hashring configuration file watcher.
// The configuration file is watched for updates.
// Hashrings are returned on the updates channel.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
// The updates chan is closed before exiting.
func HashringFromConfig(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error {
func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error {
defer close(updates)
go cw.Run(ctx)

Expand All @@ -181,3 +182,18 @@ func HashringFromConfig(ctx context.Context, updates chan<- Hashring, cw *Config
}
}
}

// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid.
func HashringFromConfig(content []byte) (Hashring, error) {
config, err := parseConfig(content)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse configuration")
}

// If hashring is empty, return an error.
if len(config) == 0 {
return nil, errors.Wrapf(err, "failed to load configuration")
}

return newMultiHashring(config), err
}
6 changes: 4 additions & 2 deletions scripts/cfggen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/alert"
"github.com/thanos-io/thanos/pkg/cacheutil"
http_util "github.com/thanos-io/thanos/pkg/http"
Expand All @@ -34,8 +37,6 @@ import (
"github.com/thanos-io/thanos/pkg/tracing/jaeger"
"github.com/thanos-io/thanos/pkg/tracing/lightstep"
"github.com/thanos-io/thanos/pkg/tracing/stackdriver"
kingpin "gopkg.in/alecthomas/kingpin.v2"
yaml "gopkg.in/yaml.v2"
)

var (
Expand All @@ -54,6 +55,7 @@ var (
trclient.ELASTIC_APM: elasticapm.Config{},
trclient.LIGHTSTEP: lightstep.Config{},
}
// TODO(kakkoyun): Add Bucket cache.
indexCacheConfigs = map[storecache.IndexCacheProvider]interface{}{
storecache.INMEMORY: storecache.InMemoryIndexCacheConfig{},
storecache.MEMCACHED: cacheutil.MemcachedClientConfig{},
Expand Down
Loading

0 comments on commit cfb0bda

Please sign in to comment.