Skip to content

Commit

Permalink
Allow to configure partitions ring KVStore independently from ingeste…
Browse files Browse the repository at this point in the history
…r ring (#7882)

* Allow to configure partitions ring KVStore independently from ingester ring

Signed-off-by: Marco Pracucci <[email protected]>

* Fixed integration tests

Signed-off-by: Marco Pracucci <[email protected]>

* Fix backward compatibility integration tests

Signed-off-by: Marco Pracucci <[email protected]>

---------

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Apr 12, 2024
1 parent 81dc01a commit 2d363d0
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 29 deletions.
23 changes: 16 additions & 7 deletions integration/backward_compatibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,21 @@ var DefaultPreviousVersionImages = map[string]e2emimir.FlagMapper{
"grafana/mimir:2.3.1": e2emimir.ChainFlagMappers(
e2emimir.SetFlagMapper(map[string]string{"-ingester.ring.readiness-check-ring-health": "false"}),
e2emimir.RemoveFlagMapper([]string{"-ingester.native-histograms-ingestion-enabled"}),
removePartitionRingFlags,
),
"grafana/mimir:2.6.0": e2emimir.RemoveFlagMapper([]string{"-ingester.native-histograms-ingestion-enabled"}),
"grafana/mimir:2.7.1": e2emimir.NoopFlagMapper,
"grafana/mimir:2.8.0": e2emimir.NoopFlagMapper,
"grafana/mimir:2.9.1": e2emimir.NoopFlagMapper,
"grafana/mimir:2.10.0": e2emimir.NoopFlagMapper,
"grafana/mimir:2.11.0": e2emimir.NoopFlagMapper,
"grafana/mimir:2.12.0": e2emimir.NoopFlagMapper,
"grafana/mimir:2.6.0": e2emimir.ChainFlagMappers(
e2emimir.RemoveFlagMapper([]string{"-ingester.native-histograms-ingestion-enabled"}),
removePartitionRingFlags,
),
"grafana/mimir:2.7.1": removePartitionRingFlags,
"grafana/mimir:2.8.0": removePartitionRingFlags,
"grafana/mimir:2.9.1": removePartitionRingFlags,
"grafana/mimir:2.10.0": removePartitionRingFlags,
"grafana/mimir:2.11.0": removePartitionRingFlags,
"grafana/mimir:2.12.0": removePartitionRingFlags,
}

var removePartitionRingFlags = e2emimir.RemoveFlagMapper([]string{
"-ingester.partition-ring.store",
"-ingester.partition-ring.consul.hostname",
})
24 changes: 16 additions & 8 deletions integration/e2emimir/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ func NewDistributor(name string, consulAddress string, flags map[string]string,
"-ingester.ring.replication-factor": "1",
"-distributor.remote-timeout": "2s", // Fail fast in integration tests.
// Configure the ingesters ring backend
"-ingester.ring.store": "consul",
"-ingester.ring.consul.hostname": consulAddress,
"-ingester.ring.store": "consul",
"-ingester.ring.consul.hostname": consulAddress,
"-ingester.partition-ring.store": "consul",
"-ingester.partition-ring.consul.hostname": consulAddress,
// Configure the distributor ring backend
"-distributor.ring.store": "memberlist",
},
Expand All @@ -113,8 +115,10 @@ func NewQuerier(name string, consulAddress string, flags map[string]string, opti
"-log.level": "warn",
"-ingester.ring.replication-factor": "1",
// Ingesters ring backend.
"-ingester.ring.store": "consul",
"-ingester.ring.consul.hostname": consulAddress,
"-ingester.ring.store": "consul",
"-ingester.ring.consul.hostname": consulAddress,
"-ingester.partition-ring.store": "consul",
"-ingester.partition-ring.consul.hostname": consulAddress,
// Query-frontend worker.
"-querier.frontend-client.backoff-min-period": "100ms",
"-querier.frontend-client.backoff-max-period": "100ms",
Expand Down Expand Up @@ -156,8 +160,10 @@ func NewIngester(name string, consulAddress string, flags map[string]string, opt
"-log.level": "warn",
"-ingester.ring.num-tokens": "512",
// Configure the ingesters ring backend
"-ingester.ring.store": "consul",
"-ingester.ring.consul.hostname": consulAddress,
"-ingester.ring.store": "consul",
"-ingester.ring.consul.hostname": consulAddress,
"-ingester.partition-ring.store": "consul",
"-ingester.partition-ring.consul.hostname": consulAddress,
// Speed up the startup.
"-ingester.ring.min-ready-duration": "0s",
// Enable native histograms
Expand Down Expand Up @@ -317,8 +323,10 @@ func NewRuler(name string, consulAddress string, flags map[string]string, option
"-target": "ruler",
"-log.level": "warn",
// Configure the ingesters ring backend
"-ingester.ring.store": "consul",
"-ingester.ring.consul.hostname": consulAddress,
"-ingester.ring.store": "consul",
"-ingester.ring.consul.hostname": consulAddress,
"-ingester.partition-ring.store": "consul",
"-ingester.partition-ring.consul.hostname": consulAddress,
},
flags,
options...,
Expand Down
1 change: 1 addition & 0 deletions integration/integration_memberlist_single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func newSingleBinary(name string, servername string, join string, testFlags map[
"-ingester.ring.min-ready-duration": "0s",
"-ingester.ring.num-tokens": "512",
"-ingester.ring.store": "memberlist",
"-ingester.partition-ring.store": "memberlist",
"-memberlist.bind-port": "8000",
"-memberlist.left-ingesters-timeout": "600s", // effectively disable
}
Expand Down
6 changes: 4 additions & 2 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,10 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
"-blocks-storage.bucket-store.index-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),

// Ingester.
"-ingester.ring.store": "consul",
"-ingester.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-ingester.ring.store": "consul",
"-ingester.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-ingester.partition-ring.store": "consul",
"-ingester.partition-ring.consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-ingester.ring.replication-factor": strconv.FormatInt(seriesReplicationFactor, 10),
"-distributor.ring.store": "consul",
Expand Down
6 changes: 4 additions & 2 deletions integration/single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ func TestMimirShouldStartInSingleBinaryModeWithAllMemcachedConfigured(t *testing
"-blocks-storage.bucket-store.chunks-cache.backend": "memcached",
"-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
// Ingester.
"-ingester.ring.store": "consul",
"-ingester.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-ingester.ring.store": "consul",
"-ingester.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-ingester.partition-ring.store": "consul",
"-ingester.partition-ring.consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-ingester.ring.replication-factor": "2",
"-distributor.ring.store": "consul",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,9 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing,
return nil, errors.Wrap(err, "creating ingest storage reader")
}

partitionRingKV := cfg.IngesterPartitionRing.kvMock
partitionRingKV := cfg.IngesterPartitionRing.KVStore.Mock
if partitionRingKV == nil {
partitionRingKV, err = kv.NewClient(cfg.IngesterRing.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(registerer, PartitionRingName+"-lifecycler"), logger)
partitionRingKV, err = kv.NewClient(cfg.IngesterPartitionRing.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(registerer, PartitionRingName+"-lifecycler"), logger)
if err != nil {
return nil, errors.Wrap(err, "creating KV store for ingester partition ring")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/ingester_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestIngester_Start(t *testing.T) {
}))

// Add the partition and owner in the ring, in order to simulate an ingester restart.
require.NoError(t, cfg.IngesterPartitionRing.kvMock.CAS(context.Background(), PartitionRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
require.NoError(t, cfg.IngesterPartitionRing.KVStore.Mock.CAS(context.Background(), PartitionRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
partitionID, err := ingest.IngesterPartitionID(cfg.IngesterRing.InstanceID)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -585,12 +585,12 @@ func createTestIngesterWithIngestStorage(t testing.TB, ingesterCfg *Config, over
ingesterCfg.IngestStorageConfig.KafkaConfig.LastProducedOffsetPollInterval = 100 * time.Millisecond

// Create the partition ring store.
kv := ingesterCfg.IngesterPartitionRing.kvMock
kv := ingesterCfg.IngesterPartitionRing.KVStore.Mock
if kv == nil {
var closer io.Closer
kv, closer = consul.NewInMemoryClient(ring.GetPartitionRingCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
ingesterCfg.IngesterPartitionRing.kvMock = kv
ingesterCfg.IngesterPartitionRing.KVStore.Mock = kv
}

ingesterCfg.IngesterPartitionRing.MinOwnersDuration = 0
Expand Down
9 changes: 6 additions & 3 deletions pkg/ingester/ingester_partition_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
)

type PartitionRingConfig struct {
KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances. This option needs be set on ingesters, distributors, queriers and rulers when running in microservices mode."`

// MinOwnersCount maps to ring.PartitionInstanceLifecyclerConfig's WaitOwnersCountOnPending.
MinOwnersCount int `yaml:"min_partition_owners_count"`

Expand All @@ -20,15 +22,16 @@ type PartitionRingConfig struct {
// DeleteInactivePartitionAfter maps to ring.PartitionInstanceLifecyclerConfig's DeleteInactivePartitionAfterDuration.
DeleteInactivePartitionAfter time.Duration `yaml:"delete_inactive_partition_after"`

// kvMock is a kv.Client mock used for testing.
kvMock kv.Client `yaml:"-"`

// lifecyclerPollingInterval is the lifecycler polling interval. This setting is used to lower it in tests.
lifecyclerPollingInterval time.Duration
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *PartitionRingConfig) RegisterFlags(f *flag.FlagSet) {
// Ring flags
cfg.KVStore.Store = "memberlist" // Override default value.
cfg.KVStore.RegisterFlagsWithPrefix("ingester.partition-ring.", "collectors/", f)

f.IntVar(&cfg.MinOwnersCount, "ingester.partition-ring.min-partition-owners-count", 1, "Minimum number of owners to wait before a PENDING partition gets switched to ACTIVE.")
f.DurationVar(&cfg.MinOwnersDuration, "ingester.partition-ring.min-partition-owners-duration", 10*time.Second, "How long the minimum number of owners should have been enforced before a PENDING partition gets switched to ACTIVE.")
f.DurationVar(&cfg.DeleteInactivePartitionAfter, "ingester.partition-ring.delete-inactive-partition-after", 13*time.Hour, "How long to wait before an INACTIVE partition is eligible for deletion. The partition will be deleted only if it has been in INACTIVE state for at least the configured duration and it has no owners registered. A value of 0 disables partitions deletion.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/owned_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ func TestOwnedSeriesServiceWithPartitionsRing(t *testing.T) {

c.cfg = defaultIngesterTestConfig(t)
c.cfg.IngesterRing.InstanceID = fmt.Sprintf("ingester-%d", tc.registerPartitionID) // Ingester owns partition based on instance ID.
c.cfg.IngesterPartitionRing.kvMock = c.kvStore // Set ring with our in-memory KV, that we will use for watching.
c.cfg.IngesterPartitionRing.KVStore.Mock = c.kvStore // Set ring with our in-memory KV, that we will use for watching.
c.cfg.BlocksStorageConfig.TSDB.Dir = "" // Don't use default value, otherwise

var err error
Expand Down
5 changes: 4 additions & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (t *Mimir) initVault() (services.Service, error) {
t.Cfg.Compactor.ShardingRing.Common.KVStore.StoreConfig.Etcd.TLS.Reader = t.Vault
t.Cfg.Distributor.DistributorRing.Common.KVStore.StoreConfig.Etcd.TLS.Reader = t.Vault
t.Cfg.Ingester.IngesterRing.KVStore.StoreConfig.Etcd.TLS.Reader = t.Vault
t.Cfg.Ingester.IngesterPartitionRing.KVStore.StoreConfig.Etcd.TLS.Reader = t.Vault
t.Cfg.Ruler.Ring.Common.KVStore.StoreConfig.Etcd.TLS.Reader = t.Vault
t.Cfg.StoreGateway.ShardingRing.KVStore.StoreConfig.Etcd.TLS.Reader = t.Vault
t.Cfg.QueryScheduler.ServiceDiscovery.SchedulerRing.KVStore.StoreConfig.Etcd.TLS.Reader = t.Vault
Expand Down Expand Up @@ -364,7 +365,7 @@ func (t *Mimir) initIngesterPartitionRing() (services.Service, error) {
return nil, nil
}

kvClient, err := kv.NewClient(t.Cfg.Ingester.IngesterRing.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(t.Registerer, ingester.PartitionRingName+"-watcher"), util_log.Logger)
kvClient, err := kv.NewClient(t.Cfg.Ingester.IngesterPartitionRing.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(t.Registerer, ingester.PartitionRingName+"-watcher"), util_log.Logger)
if err != nil {
return nil, errors.Wrap(err, "creating KV store for ingester partitions ring watcher")
}
Expand Down Expand Up @@ -420,6 +421,7 @@ func (t *Mimir) initRuntimeConfig() (services.Service, error) {
t.Cfg.Compactor.ShardingRing.Common.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig)
t.Cfg.Distributor.DistributorRing.Common.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig)
t.Cfg.Ingester.IngesterRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig)
t.Cfg.Ingester.IngesterPartitionRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig)
t.Cfg.Ruler.Ring.Common.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig)
t.Cfg.StoreGateway.ShardingRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig)
t.Cfg.QueryScheduler.ServiceDiscovery.SchedulerRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig)
Expand Down Expand Up @@ -995,6 +997,7 @@ func (t *Mimir) initMemberlistKV() (services.Service, error) {
// Update the config.
t.Cfg.Distributor.DistributorRing.Common.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ingester.IngesterRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ingester.IngesterPartitionRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Compactor.ShardingRing.Common.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ruler.Ring.Common.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
Expand Down
6 changes: 6 additions & 0 deletions pkg/mimir/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,15 @@ func TestMultiKVSetup(t *testing.T) {
All: func(t *testing.T, c Config) {
require.NotNil(t, c.Distributor.DistributorRing.Common.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.Ingester.IngesterRing.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.Ingester.IngesterPartitionRing.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.StoreGateway.ShardingRing.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.Compactor.ShardingRing.Common.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.Ruler.Ring.Common.KVStore.Multi.ConfigProvider)
},

Ruler: func(t *testing.T, c Config) {
require.NotNil(t, c.Ingester.IngesterRing.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.Ingester.IngesterPartitionRing.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.StoreGateway.ShardingRing.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.Ruler.Ring.Common.KVStore.Multi.ConfigProvider)
},
Expand All @@ -194,10 +196,12 @@ func TestMultiKVSetup(t *testing.T) {
Distributor: func(t *testing.T, c Config) {
require.NotNil(t, c.Distributor.DistributorRing.Common.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.Ingester.IngesterRing.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.Ingester.IngesterPartitionRing.KVStore.Multi.ConfigProvider)
},

Ingester: func(t *testing.T, c Config) {
require.NotNil(t, c.Ingester.IngesterRing.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.Ingester.IngesterPartitionRing.KVStore.Multi.ConfigProvider)
},

StoreGateway: func(t *testing.T, c Config) {
Expand All @@ -207,6 +211,7 @@ func TestMultiKVSetup(t *testing.T) {
Querier: func(t *testing.T, c Config) {
require.NotNil(t, c.StoreGateway.ShardingRing.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.Ingester.IngesterRing.KVStore.Multi.ConfigProvider)
require.NotNil(t, c.Ingester.IngesterPartitionRing.KVStore.Multi.ConfigProvider)
},

Compactor: func(t *testing.T, c Config) {
Expand Down Expand Up @@ -327,6 +332,7 @@ func TestInitVault(t *testing.T) {
require.NotNil(t, mimir.Cfg.Compactor.ShardingRing.Common.KVStore.StoreConfig.Etcd.TLS.Reader)
require.NotNil(t, mimir.Cfg.Distributor.DistributorRing.Common.KVStore.StoreConfig.Etcd.TLS.Reader)
require.NotNil(t, mimir.Cfg.Ingester.IngesterRing.KVStore.StoreConfig.Etcd.TLS.Reader)
require.NotNil(t, mimir.Cfg.Ingester.IngesterPartitionRing.KVStore.StoreConfig.Etcd.TLS.Reader)
require.NotNil(t, mimir.Cfg.Ruler.Ring.Common.KVStore.StoreConfig.Etcd.TLS.Reader)
require.NotNil(t, mimir.Cfg.StoreGateway.ShardingRing.KVStore.StoreConfig.Etcd.TLS.Reader)
require.NotNil(t, mimir.Cfg.QueryScheduler.ServiceDiscovery.SchedulerRing.KVStore.StoreConfig.Etcd.TLS.Reader)
Expand Down

0 comments on commit 2d363d0

Please sign in to comment.