Skip to content

Commit

Permalink
Allowing for multiple defined targets (#3807)
Browse files Browse the repository at this point in the history
Signed-off-by: Danny Kopping <[email protected]>
  • Loading branch information
Danny Kopping authored Jun 7, 2021
1 parent 71677db commit 3642fac
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
19 changes: 13 additions & 6 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ import (

// Config is the root config for Loki.
type Config struct {
Target string `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
HTTPPrefix string `yaml:"http_prefix"`
Target flagext.StringSliceCSV `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
HTTPPrefix string `yaml:"http_prefix"`

Server server.Config `yaml:"server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Expand All @@ -80,7 +80,10 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Server.MetricsNamespace = "loki"
c.Server.ExcludeRequestInLog = true

f.StringVar(&c.Target, "target", All, "target module (default All)")
// Set the default module list to 'all'
c.Target = []string{All}
f.Var(&c.Target, "target", "Comma-separated list of Loki modules to load. "+
"The alias 'all' can be used in the list to load a number of core modules and will enable single-binary mode. ")
f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.")

c.Server.RegisterFlags(f)
Expand Down Expand Up @@ -148,6 +151,10 @@ func (c *Config) Validate() error {
return nil
}

func (c *Config) isModuleEnabled(m string) bool {
return util.StringsContain(c.Target, m)
}

// Loki is the root datastructure for Loki.
type Loki struct {
Cfg Config
Expand Down Expand Up @@ -231,7 +238,7 @@ func newDefaultConfig() *Config {

// Run starts Loki running, and blocks until a Loki stops.
func (t *Loki) Run() error {
serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target)
serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target...)
if err != nil {
return err
}
Expand Down Expand Up @@ -393,7 +400,7 @@ func (t *Loki) setupModuleManager() error {
}

// Add IngesterQuerier as a dependency for store when target is either ingester or querier.
if t.Cfg.Target == Querier || t.Cfg.Target == Ruler {
if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) {
deps[Store] = append(deps[Store], IngesterQuerier)
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (t *Loki) initDistributor() (services.Service, error) {
return nil, err
}

if t.Cfg.Target != All {
if !t.Cfg.isModuleEnabled(All) {
logproto.RegisterPusherServer(t.Server.GRPC, t.distributor)
}

Expand Down Expand Up @@ -291,8 +291,8 @@ func (t *Loki) initStore() (_ services.Service, err error) {

if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID
switch t.Cfg.Target {
case Ingester:
switch true {
case t.Cfg.isModuleEnabled(Ingester):
// We do not want ingester to unnecessarily keep downloading files
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
// Use fifo cache for caching index in memory.
Expand All @@ -308,7 +308,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
},
}
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute
case Querier, Ruler:
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
default:
Expand All @@ -324,8 +324,8 @@ func (t *Loki) initStore() (_ services.Service, err error) {

if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)
switch t.Cfg.Target {
case Querier, Ruler:
switch true {
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler):
// Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true
if t.Cfg.Querier.QueryStoreOnly {
break
Expand All @@ -335,7 +335,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
chunkStore = loki_storage.NewAsyncStore(chunkStore, t.ingesterQuerier,
calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration),
)
case All:
case t.Cfg.isModuleEnabled(All):
// We want ingester to also query the store when using boltdb-shipper but only when running with target All.
// We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store.
// ToDo: See if we can avoid doing this when not running loki in clustered mode.
Expand Down Expand Up @@ -498,7 +498,7 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) {
// unfortunately there is no way to generate a "default" config and compare default against actual
// to determine if it's unconfigured. the following check, however, correctly tests this.
// Single binary integration tests will break if this ever drifts
if t.Cfg.Target == All && t.Cfg.Ruler.StoreConfig.IsDefaults() {
if t.Cfg.isModuleEnabled(All) && t.Cfg.Ruler.StoreConfig.IsDefaults() {
level.Info(util_log.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.")
return
}
Expand Down

0 comments on commit 3642fac

Please sign in to comment.