Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/blocks #1695

Merged
merged 25 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@ require (
github.com/Azure/go-autorest v11.5.1+incompatible // indirect
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5
github.com/NYTimes/gziphandler v1.1.1
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4
github.com/aws/aws-sdk-go v1.23.12
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect
github.com/blang/semver v3.5.0+incompatible
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/bradfitz/gomemcache v0.0.0-20190329173943-551aad21a668
github.com/cenkalti/backoff v1.0.0 // indirect
github.com/cespare/xxhash v1.1.0
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/cznic/ql v1.2.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fluent/fluent-logger-golang v1.2.1 // indirect
github.com/fsouza/fake-gcs-server v1.3.0
Expand Down Expand Up @@ -48,6 +47,7 @@ require (
github.com/lib/pq v1.0.0
github.com/mattes/migrate v1.3.1
github.com/mattn/go-sqlite3 v1.10.0 // indirect
github.com/oklog/ulid v1.3.1
github.com/opentracing-contrib/go-grpc v0.0.0-20180928155321-4b5a12d3ff02
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9
github.com/opentracing/opentracing-go v1.1.0
Expand All @@ -62,11 +62,10 @@ require (
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/sercand/kuberesolver v2.1.0+incompatible // indirect
github.com/stretchr/testify v1.4.0
github.com/thanos-io/thanos v0.7.0
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/uber-go/atomic v1.3.2 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible // indirect
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1
github.com/weaveworks/common v0.0.0-20190822150010-afb9996716e4
github.com/weaveworks/promrus v1.2.0 // indirect
Expand Down
60 changes: 58 additions & 2 deletions go.sum

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions pkg/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ import (
"github.com/pkg/errors"
)

// Supported storage engines
const (
StorageEngineChunks = "chunks"
StorageEngineTSDB = "tsdb"
)

// StoreLimits helps get Limits specific to Queries for Stores
type StoreLimits interface {
CardinalityLimit(userID string) int
Expand All @@ -27,6 +33,7 @@ type StoreLimits interface {

// Config chooses which storage client to use.
type Config struct {
Engine string `yaml:"engine"`
AWSStorageConfig aws.StorageConfig `yaml:"aws"`
GCPStorageConfig gcp.Config `yaml:"bigtable"`
GCSConfig gcp.GCSConfig `yaml:"gcs"`
Expand All @@ -48,10 +55,20 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.BoltDBConfig.RegisterFlags(f)
cfg.FSConfig.RegisterFlags(f)

f.StringVar(&cfg.Engine, "store.engine", "chunks", "The storage engine to use: chunks or tsdb. Be aware tsdb is experimental and shouldn't be used in production.")
cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading. ", f)
f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.")
}

// Validate config and returns error on failure
func (cfg *Config) Validate() error {
if cfg.Engine != StorageEngineChunks && cfg.Engine != StorageEngineTSDB {
return errors.New("unsupported storage engine")
}

return nil
}

// NewStore makes the storage clients based on the configuration.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) {
tieredCache, err := cache.New(cfg.IndexQueriesCacheConfig)
Expand Down
11 changes: 11 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down Expand Up @@ -69,6 +70,7 @@ type Config struct {
Frontend frontend.Config `yaml:"frontend,omitempty"`
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags.
TSDB tsdb.Config `yaml:"tsdb"`

Ruler ruler.Config `yaml:"ruler,omitempty"`
ConfigDB db.Config `yaml:"configdb,omitempty"`
Expand Down Expand Up @@ -100,6 +102,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Frontend.RegisterFlags(f)
c.TableManager.RegisterFlags(f)
c.Encoding.RegisterFlags(f)
c.TSDB.RegisterFlags(f)

c.Ruler.RegisterFlags(f)
c.ConfigDB.RegisterFlags(f)
Expand All @@ -119,6 +122,14 @@ func (c *Config) Validate() error {
if err := c.Encoding.Validate(); err != nil {
return errors.Wrap(err, "invalid encoding config")
}

if err := c.Storage.Validate(); err != nil {
return errors.Wrap(err, "invalid storage config")
}

if err := c.TSDB.Validate(); err != nil {
return errors.Wrap(err, "invalid TSDB config")
}
return nil
}

Expand Down
40 changes: 37 additions & 3 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/thanos-io/thanos/pkg/objstore/s3"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
Expand Down Expand Up @@ -193,7 +194,25 @@ func (t *Cortex) initQuerier(cfg *Config) (err error) {
return
}

queryable, engine := querier.New(cfg.Querier, t.distributor, t.store)
var store querier.ChunkStore

if cfg.Storage.Engine == storage.StorageEngineTSDB {
s3cfg := s3.Config{
Bucket: cfg.TSDB.S3.BucketName,
Endpoint: cfg.TSDB.S3.Endpoint,
AccessKey: cfg.TSDB.S3.AccessKeyID,
SecretKey: cfg.TSDB.S3.SecretAccessKey,
Insecure: cfg.TSDB.S3.Insecure,
}
store, err = querier.NewBlockQuerier(s3cfg, cfg.TSDB.SyncDir, prometheus.DefaultRegisterer)
if err != nil {
return err
}
} else {
store = t.store
}

queryable, engine := querier.New(cfg.Querier, t.distributor, store)
api := v1.NewAPI(
engine,
queryable,
Expand Down Expand Up @@ -228,6 +247,9 @@ func (t *Cortex) stopQuerier() error {

func (t *Cortex) initIngester(cfg *Config) (err error) {
cfg.Ingester.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort
cfg.Ingester.TSDBEnabled = cfg.Storage.Engine == storage.StorageEngineTSDB
cfg.Ingester.TSDBConfig = cfg.TSDB

t.ingester, err = ingester.New(cfg.Ingester, cfg.IngesterClient, t.overrides, t.store, prometheus.DefaultRegisterer)
if err != nil {
return
Expand All @@ -246,6 +268,9 @@ func (t *Cortex) stopIngester() error {
}

func (t *Cortex) initStore(cfg *Config) (err error) {
if cfg.Storage.Engine == storage.StorageEngineTSDB {
return nil
}
err = cfg.Schema.Load()
if err != nil {
return
Expand All @@ -256,7 +281,9 @@ func (t *Cortex) initStore(cfg *Config) (err error) {
}

func (t *Cortex) stopStore() error {
t.store.Stop()
if t.store != nil {
t.store.Stop()
}
return nil
}

Expand All @@ -281,6 +308,10 @@ func (t *Cortex) stopQueryFrontend() (err error) {
}

func (t *Cortex) initTableManager(cfg *Config) error {
if cfg.Storage.Engine == storage.StorageEngineTSDB {
return nil // table manager isn't used in v2
}

err := cfg.Schema.Load()
if err != nil {
return err
Expand Down Expand Up @@ -319,7 +350,10 @@ func (t *Cortex) initTableManager(cfg *Config) error {
}

func (t *Cortex) stopTableManager() error {
t.tableManager.Stop()
if t.tableManager != nil {
t.tableManager.Stop()
}

return nil
}

Expand Down
68 changes: 68 additions & 0 deletions pkg/ingester/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package ingester

import (
"fmt"
"io"
"strings"

"github.com/thanos-io/thanos/pkg/objstore"
"golang.org/x/net/context"
)

// Bucket is a wrapper around a objstore.Bucket that prepends writes with a userID
type Bucket struct {
UserID string
Bucket objstore.Bucket
}

func (b *Bucket) fullName(name string) string {
return fmt.Sprintf("%s/%s", b.UserID, name)
}

// Close implements io.Closer
func (b *Bucket) Close() error { return b.Bucket.Close() }

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
return b.Bucket.Upload(ctx, b.fullName(name), r)
}

// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
return b.Bucket.Delete(ctx, b.fullName(name))
}

// Name returns the bucket name for the provider.
func (b *Bucket) Name() string { return b.Bucket.Name() }

// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {
return b.Bucket.Iter(ctx, b.fullName(dir), func(s string) error {
/*
Since all objects are prefixed with the userID we need to strip the userID
upon passing to the processing function
*/
return f(strings.Join(strings.Split(s, "/")[1:], "/"))
})
}

// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.Bucket.Get(ctx, b.fullName(name))
}

// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.Bucket.GetRange(ctx, b.fullName(name), off, length)
}

// Exists checks if the given object exists in the bucket.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
return b.Bucket.Exists(ctx, b.fullName(name))
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *Bucket) IsObjNotFoundErr(err error) bool {
return b.Bucket.IsObjNotFoundErr(err)
}
Loading