Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(persistence): create pgx store #6359

Merged
merged 29 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
fcd7abc
feat(persistance): prefix plugin and store with pq
slonka Mar 27, 2023
5f5faab
feat(persistance): compiles
slonka Mar 27, 2023
e2d54a9
feat(persistance): wire in pgx tests
slonka Mar 27, 2023
5f14b54
feat(persistance): wire in pgx store for store operations
slonka Mar 27, 2023
72bbcfa
feat(persistance): fixed update test
slonka Mar 28, 2023
53bee83
feat(persistance): fix other tests
slonka Mar 28, 2023
687c3c7
feat(persistance): make check pass
slonka Mar 28, 2023
1dd3c13
feat(persistance): remove dockerfile modification
slonka Mar 28, 2023
a59c19a
Merge branch 'master' into create-pgx-store
slonka Mar 28, 2023
c13c0ac
ci(circleci): kick ci
slonka Mar 28, 2023
d060e34
feat(persistance): fix dockerfile
slonka Mar 28, 2023
f3b37ae
feat(persistance): tests pass locally
slonka Mar 29, 2023
63d63b2
feat(persistance): make check pass
slonka Mar 29, 2023
3413368
feat(persistance): make it possible to configure pgx specific settings
slonka Mar 29, 2023
a3e4aca
feat(persistance): make check pass
slonka Mar 29, 2023
c65eeed
feat(persistance): wire in pgx as an option to use with migrations
slonka Mar 29, 2023
58819d9
feat(persistance): add upgrade docs
slonka Mar 29, 2023
8f37227
feat(persistance): reword upgrade docs
slonka Mar 29, 2023
ce499fe
feat(persistance): fix kuma defaults
slonka Mar 29, 2023
5a769eb
feat(persistance): warn instead of erroring out on a unsupported pgx …
slonka Mar 29, 2023
4c9f8a2
feat(persistance): fix comments in kuma-cp defaults
slonka Mar 29, 2023
6744cd0
feat(persistance): fix nitpick from pr review
slonka Mar 29, 2023
e29a7fc
feat(persistance): use one plugin
slonka Mar 29, 2023
a3d7dbb
feat(persistance): format change
slonka Mar 29, 2023
d670e7e
feat(persistance): remove leftovers from pgx plugin
slonka Mar 29, 2023
acda410
feat(persistance): rewrite createStore as higher order function
slonka Mar 29, 2023
e143233
feat(persistance): fix nil pointer dereference in create store
slonka Mar 29, 2023
f0515e0
feat(persistance): charly review nits
slonka Mar 30, 2023
a105ebe
feat(persistance): mike review
slonka Mar 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ does not have any particular instructions.

## Upcoming release

### Universal

### Changed default postgres driver to pgx
- If you encounter any problems with the persistence layer please [submit an issue](https://github.com/kumahq/kuma/issues/new) and temporarily switch to the previous driver (`lib/pq`) by setting
slonka marked this conversation as resolved.
Show resolved Hide resolved
`DriverName=postgres` configuration option or `KUMA_STORE_POSTGRES_DRIVER_NAME='postgres'` env variable.
- Several configuration settings are not supported by the new driver right now, if used to configure them please try running with new defaults or [submit an issue](https://github.com/kumahq/kuma/issues/new).
List of unsupported configuration options:
- MaxIdleConnections (used in store)
- MinReconnectInterval (used in events listener)
- MaxReconnectInterval (used in events listener)

### K8s

### Removed deprecated annotations
Expand Down
17 changes: 13 additions & 4 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,19 @@ store:
driverName: pgx # ENV: KUMA_STORE_POSTGRES_DRIVER_NAME
# Connection Timeout to the DB in seconds
connectionTimeout: 5 # ENV: KUMA_STORE_POSTGRES_CONNECTION_TIMEOUT
# Maximum number of open connections to the database
# MaxConnectionLifetime (applied only when driverName=pgx) is the duration since creation after which a connection will be automatically closed
maxConnectionLifetime: "1h" # ENV: KUMA_STORE_POSTGRES_MAX_CONNECTION_LIFETIME
# MaxConnectionLifetimeJitter (applied only when driverName=pgx) is the duration after maxConnectionLifetime to randomly decide to close a connection.
# This helps prevent all connections from being closed at the exact same time, starving the pool.
maxConnectionLifetimeJitter: "1m" # ENV: KUMA_STORE_POSTGRES_MAX_CONNECTION_LIFETIME_JITTER
# HealthCheckInterval (applied only when driverName=pgx) is the duration between checks of the health of idle connections.
healthCheckInterval: "30s" # ENV: KUMA_STORE_POSTGRES_HEALTH_CHECK_INTERVAL
# MinOpenConnections (applied only when driverName=pgx) is the minimum number of open connections to the database
minOpenConnections: 0 # ENV: KUMA_STORE_POSTGRES_MIN_OPEN_CONNECTIONS
# MaxOpenConnections is the maximum number of open connections to the database
# `0` value means number of open connections is unlimited
maxOpenConnections: 50 # ENV: KUMA_STORE_POSTGRES_MAX_OPEN_CONNECTIONS
# Maximum number of connections in the idle connection pool
# MaxIdleConnections (applied only when driverName=postgres) is the maximum number of connections in the idle connection pool
# <0 value means no idle connections and 0 means default max idle connections
lahabana marked this conversation as resolved.
Show resolved Hide resolved
maxIdleConnections: 50 # ENV: KUMA_STORE_POSTGRES_MAX_IDLE_CONNECTIONS
# TLS settings
Expand All @@ -45,13 +54,13 @@ store:
keyPath: # ENV: KUMA_STORE_POSTGRES_TLS_KEY_PATH
# Path to the root certificate. Used in verifyCa and verifyFull modes.
caPath: # ENV: KUMA_STORE_POSTGRES_TLS_ROOT_CERT_PATH
# MinReconnectInterval controls the duration to wait before trying to
# MinReconnectInterval (applied only when driverName=postgres) controls the duration to wait before trying to
# re-establish the database connection after connection loss. After each
# consecutive failure this interval is doubled, until MaxReconnectInterval
# is reached. Successfully completing the connection establishment procedure
# resets the interval back to MinReconnectInterval.
minReconnectInterval: "10s" # ENV: KUMA_STORE_POSTGRES_MIN_RECONNECT_INTERVAL
slonka marked this conversation as resolved.
Show resolved Hide resolved
# MaxReconnectInterval controls the maximum possible duration to wait before trying
# MaxReconnectInterval (applied only when driverName=postgres) controls the maximum possible duration to wait before trying
# to re-establish the database connection after connection loss.
maxReconnectInterval: "60s" # ENV: KUMA_STORE_POSTGRES_MAX_RECONNECT_INTERVAL

Expand Down
1 change: 1 addition & 0 deletions pkg/config/core/resources/store/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type StoreType = string
const (
KubernetesStore StoreType = "kubernetes"
PostgresStore StoreType = "postgres"
PgxStore StoreType = "pgx"
MemoryStore StoreType = "memory"
)

Expand Down
12 changes: 12 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Store.Postgres.DbName).To(Equal("kuma"))
Expect(cfg.Store.Postgres.DriverName).To(Equal("postgres"))
Expect(cfg.Store.Postgres.ConnectionTimeout).To(Equal(10))
Expect(cfg.Store.Postgres.MaxConnectionLifetime.Duration).To(Equal(123 * time.Minute))
Expect(cfg.Store.Postgres.MaxConnectionLifetimeJitter.Duration).To(Equal(456 * time.Second))
Expect(cfg.Store.Postgres.HealthCheckInterval.Duration).To(Equal(78 * time.Second))
Expect(cfg.Store.Postgres.MinOpenConnections).To(Equal(3))
Expect(cfg.Store.Postgres.MaxOpenConnections).To(Equal(300))
Expect(cfg.Store.Postgres.MaxIdleConnections).To(Equal(300))
Expect(cfg.Store.Postgres.MinReconnectInterval.Duration).To(Equal(44 * time.Second))
Expand Down Expand Up @@ -339,7 +343,11 @@ store:
dbName: kuma
driverName: postgres
connectionTimeout: 10
minOpenConnections: 3
maxOpenConnections: 300
maxConnectionLifetime: 123m
maxConnectionLifetimeJitter: 456s
healthCheckInterval: 78s
maxIdleConnections: 300
minReconnectInterval: 44s
maxReconnectInterval: 55s
Expand Down Expand Up @@ -648,8 +656,12 @@ proxy:
"KUMA_STORE_POSTGRES_DB_NAME": "kuma",
"KUMA_STORE_POSTGRES_DRIVER_NAME": "postgres",
"KUMA_STORE_POSTGRES_CONNECTION_TIMEOUT": "10",
"KUMA_STORE_POSTGRES_MIN_OPEN_CONNECTIONS": "3",
"KUMA_STORE_POSTGRES_MAX_OPEN_CONNECTIONS": "300",
"KUMA_STORE_POSTGRES_MAX_IDLE_CONNECTIONS": "300",
"KUMA_STORE_POSTGRES_MAX_CONNECTION_LIFETIME": "123m",
"KUMA_STORE_POSTGRES_MAX_CONNECTION_LIFETIME_JITTER": "456s",
"KUMA_STORE_POSTGRES_HEALTH_CHECK_INTERVAL": "78s",
"KUMA_STORE_POSTGRES_TLS_MODE": "verifyFull",
"KUMA_STORE_POSTGRES_TLS_CERT_PATH": "/path/to/cert",
"KUMA_STORE_POSTGRES_TLS_KEY_PATH": "/path/to/key",
Expand Down
79 changes: 70 additions & 9 deletions pkg/config/plugins/resources/postgres/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,29 @@ import (

"github.com/kumahq/kuma/pkg/config"
config_types "github.com/kumahq/kuma/pkg/config/types"
"github.com/kumahq/kuma/pkg/core"
)

const (
DriverNamePgx = "pgx"
DriverNamePq = "postgres"
DefaultMaxIdleConnections = 50
DefaultMinOpenConnections = 0
)

var _ config.Config = &PostgresStoreConfig{}
var (
_ config.Config = &PostgresStoreConfig{}
logger = core.Log.WithName("postgres-config")
)

var (
DefaultMinReconnectInterval = config_types.Duration{Duration: 10 * time.Second}
DefaultMaxReconnectInterval = config_types.Duration{Duration: 60 * time.Second}
DefaultMaxConnectionLifetime = config_types.Duration{Duration: time.Hour}
DefaultMaxConnectionLifetimeJitter = config_types.Duration{Duration: 1 * time.Minute}
DefaultHealthCheckInterval = config_types.Duration{Duration: 30 * time.Second}
// some of the above settings taken from pgx https://github.com/jackc/pgx/blob/ca022267dbbfe7a8ba7070557352a5cd08f6cb37/pgxpool/pool.go#L18-L22
)

// Postgres store configuration
type PostgresStoreConfig struct {
Expand All @@ -34,21 +49,30 @@ type PostgresStoreConfig struct {
DriverName string `json:"driverName" envconfig:"kuma_store_postgres_driver_name"`
// Connection Timeout to the DB in seconds
ConnectionTimeout int `json:"connectionTimeout" envconfig:"kuma_store_postgres_connection_timeout"`
// Maximum number of open connections to the database
// MaxConnectionLifetime (applied only when driverName=pgx) is the duration since creation after which a connection will be automatically closed
MaxConnectionLifetime config_types.Duration `json:"maxConnectionLifetime" envconfig:"kuma_store_postgres_max_connection_lifetime"`
// MaxConnectionLifetimeJitter (applied only when driverName=pgx) is the duration after MaxConnectionLifetime to randomly decide to close a connection.
// This helps prevent all connections from being closed at the exact same time, starving the pool.
MaxConnectionLifetimeJitter config_types.Duration `json:"maxConnectionLifetimeJitter" envconfig:"kuma_store_postgres_max_connection_lifetime_jitter"`
// HealthCheckInterval (applied only when driverName=pgx) is the duration between checks of the health of idle connections.
HealthCheckInterval config_types.Duration `json:"healthCheckInterval" envconfig:"kuma_store_postgres_health_check_interval"`
// MinOpenConnections (applied only when driverName=pgx) is the minimum number of open connections to the database
MinOpenConnections int `json:"minOpenConnections" envconfig:"kuma_store_postgres_min_open_connections"`
// MaxOpenConnections is the maximum number of open connections to the database
// `0` value means number of open connections is unlimited
MaxOpenConnections int `json:"maxOpenConnections" envconfig:"kuma_store_postgres_max_open_connections"`
// Maximum number of connections in the idle connection pool
// MaxIdleConnections (applied only when driverName=postgres) is the maximum number of connections in the idle connection pool
// <0 value means no idle connections and 0 means default max idle connections
MaxIdleConnections int `json:"maxIdleConnections" envconfig:"kuma_store_postgres_max_idle_connections"`
// TLS settings
TLS TLSPostgresStoreConfig `json:"tls"`
// MinReconnectInterval controls the duration to wait before trying to
// MinReconnectInterval (applied only when driverName=postgres) controls the duration to wait before trying to
// re-establish the database connection after connection loss. After each
// consecutive failure this interval is doubled, until MaxReconnectInterval
// is reached. Successfully completing the connection establishment procedure
// resets the interval back to MinReconnectInterval.
MinReconnectInterval config_types.Duration `json:"minReconnectInterval" envconfig:"kuma_store_postgres_min_reconnect_interval"`
// MaxReconnectInterval controls the maximum possible duration to wait before trying
// MaxReconnectInterval (applied only when driverName=postgres) controls the maximum possible duration to wait before trying
// to re-establish the database connection after connection loss.
MaxReconnectInterval config_types.Duration `json:"maxReconnectInterval" envconfig:"kuma_store_postgres_max_reconnect_interval"`
}
Expand Down Expand Up @@ -178,6 +202,39 @@ func (p *PostgresStoreConfig) Validate() error {
if p.MinReconnectInterval.Duration >= p.MaxReconnectInterval.Duration {
return errors.New("MinReconnectInterval should be less than MaxReconnectInterval")
}
warning := "[WARNING] "
switch p.DriverName {
slonka marked this conversation as resolved.
Show resolved Hide resolved
case DriverNamePgx:
pqAlternativeMessage := "does not have an equivalent for pgx driver. " +
"If you need this setting consider using lib/pq as a postgres driver by setting driverName='postgres' or " +
"setting KUMA_STORE_POSTGRES_DRIVER_NAME='postgres' env variable."
if p.MaxIdleConnections != DefaultMaxIdleConnections {
logger.Info(warning + "MaxIdleConnections " + pqAlternativeMessage)
}
if p.MinReconnectInterval != DefaultMinReconnectInterval {
logger.Info(warning + "MinReconnectInterval " + pqAlternativeMessage)
}
if p.MaxReconnectInterval != DefaultMaxReconnectInterval {
logger.Info(warning + "MaxReconnectInterval " + pqAlternativeMessage)
}
case DriverNamePq:
pgxAlternativeMessage := "does not have an equivalent for pq driver. " +
"If you need this setting consider using pgx as a postgres driver by setting driverName='pgx' or " +
"setting KUMA_STORE_POSTGRES_DRIVER_NAME='pgx' env variable."
if p.MinOpenConnections != DefaultMinOpenConnections {
logger.Info(warning + "MinOpenConnections " + pgxAlternativeMessage)
}
if p.MaxConnectionLifetime != DefaultMaxConnectionLifetime {
logger.Info(warning + "MaxConnectionLifetime " + pgxAlternativeMessage)
}
if p.MaxConnectionLifetimeJitter != DefaultMaxConnectionLifetimeJitter {
logger.Info(warning + "MaxConnectionLifetimeJitter " + pgxAlternativeMessage)
}
if p.HealthCheckInterval != DefaultHealthCheckInterval {
logger.Info(warning + "HealthCheckInterval " + pgxAlternativeMessage)
}
}

return nil
}

Expand All @@ -188,13 +245,17 @@ func DefaultPostgresStoreConfig() *PostgresStoreConfig {
User: "kuma",
Password: "kuma",
DbName: "kuma",
DriverName: "pgx",
DriverName: DriverNamePgx,
ConnectionTimeout: 5,
MaxOpenConnections: 50, // 0 for unlimited
MaxIdleConnections: 50, // 0 for unlimited
MaxIdleConnections: DefaultMaxIdleConnections, // 0 for unlimited
TLS: DefaultTLSPostgresStoreConfig(),
MinReconnectInterval: config_types.Duration{Duration: 10 * time.Second},
MaxReconnectInterval: config_types.Duration{Duration: 60 * time.Second},
MinReconnectInterval: DefaultMinReconnectInterval,
MaxReconnectInterval: DefaultMaxReconnectInterval,
MinOpenConnections: DefaultMinOpenConnections,
MaxConnectionLifetime: DefaultMaxConnectionLifetime,
MaxConnectionLifetimeJitter: DefaultMaxConnectionLifetimeJitter,
HealthCheckInterval: DefaultHealthCheckInterval,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/resources/k8s/store_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ var _ = Describe("KubernetesStore template", func() {
},
Scheme: k8sClientScheme,
}
})
}, "kubernetes")
})
4 changes: 2 additions & 2 deletions pkg/plugins/resources/memory/store_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ import (
)

var _ = Describe("MemoryStore template", func() {
test_store.ExecuteStoreTests(memory.NewStore)
test_store.ExecuteOwnerTests(memory.NewStore)
test_store.ExecuteStoreTests(memory.NewStore, "memory")
test_store.ExecuteOwnerTests(memory.NewStore, "memory")
})
3 changes: 3 additions & 0 deletions pkg/plugins/resources/postgres/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package postgres

const duplicateKeyErrorMsg = "duplicate key value violates unique constraint"
19 changes: 13 additions & 6 deletions pkg/plugins/resources/postgres/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var _ = Describe("Events", func() {
defer close(eventBusStopCh)
defer close(listenerErrCh)
listener := setupListeners(cfg, driverName, listenerErrCh, listenerStopCh)
go triggerNotifications(cfg, storeErrCh)
go triggerNotifications(cfg, driverName, storeErrCh)

// when
event, err := listener.Recv(eventBusStopCh)
Expand All @@ -67,7 +67,7 @@ var _ = Describe("Events", func() {
listenerStopCh, listenerErrCh, eventBusStopCh, storeErrCh := setupChannels()
defer close(eventBusStopCh)
listener := setupListeners(cfg, driverName, listenerErrCh, listenerStopCh)
go triggerNotifications(cfg, storeErrCh)
go triggerNotifications(cfg, driverName, storeErrCh)

// when
event, err := listener.Recv(eventBusStopCh)
Expand Down Expand Up @@ -110,10 +110,17 @@ func setupChannels() (chan struct{}, chan error, chan struct{}, chan error) {
return listenerStopCh, listenerErrCh, eventBusStopCh, storeErrCh
}

func setupStore(cfg postgres_config.PostgresStoreConfig) store.ResourceStore {
func setupStore(cfg postgres_config.PostgresStoreConfig, driverName string) store.ResourceStore {
metrics, err := core_metrics.NewMetrics("Standalone")
Expect(err).ToNot(HaveOccurred())
pStore, err := NewStore(metrics, cfg)
var pStore store.ResourceStore
if driverName == "pgx" {
cfg.DriverName = postgres_config.DriverNamePgx
pStore, err = NewPgxStore(metrics, cfg)
} else {
cfg.DriverName = postgres_config.DriverNamePq
pStore, err = NewPqStore(metrics, cfg)
}
Expect(err).ToNot(HaveOccurred())
return pStore
}
Expand All @@ -131,8 +138,8 @@ func setupListeners(cfg postgres_config.PostgresStoreConfig, driverName string,
return listener
}

func triggerNotifications(cfg postgres_config.PostgresStoreConfig, storeErrCh chan error) {
pStore := setupStore(cfg)
func triggerNotifications(cfg postgres_config.PostgresStoreConfig, driverName string, storeErrCh chan error) {
pStore := setupStore(cfg, driverName)
defer GinkgoRecover()
for i := 0; !channels.IsClosed(storeErrCh); i++ {
err := pStore.Create(context.Background(), mesh.NewMeshResource(), store.CreateByKey(fmt.Sprintf("mesh-%d", i), ""))
Expand Down
Loading