Skip to content

Commit

Permalink
Use a different connection pool for read vs. write operations
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 1, 2023
1 parent 9e76022 commit b8f4119
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 32 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ Unreleased
- Update to pgx5 library
- Add query timeouts using context cancellation. The corresponding
configuration settings are ``read_timeout`` and ``write_timeout``.
- Use a different connection pool for read vs. write operations.
The corresponding settings to configure the maximum pool sizes
are ``read_pool_size_max`` and ``write_pool_size_max``.

BREAKING CHANGES
----------------
Expand Down
23 changes: 20 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,22 @@ The configuration settings (with one example endpoint) are as below:
user: "crate" # Username to use (default: "crate")
password: "" # Password to use (default: "").
schema: "" # Schema to use (default: "").
max_connections: 5 # The maximum number of concurrent connections (default: 5).
max_connections: 0 # The maximum number of concurrent connections (default: runtime.NumCPU()).
# It will get forwarded to pgx's `pool_max_conns`, and determines
# the maximum number of connections in the connection pool.
# the maximum number of connections in the connection pool for
# both connection pools (read and write).
read_pool_size_max: 0 # Configure the maximum pool size for read operations individually.
# (default: runtime.NumCPU())
write_pool_size_max: 0 # Configure the maximum pool size for write operations individually.
# (default: runtime.NumCPU())
connect_timeout: 10 # TCP connect timeout (seconds) (default: 10).
# It has the same meaning as libpq's `connect_timeout`.
read_timeout: 5 # Query context timeout for read queries (seconds) (default: 5).
write_timeout: 5 # Query context timeout for write queries (seconds) (default: 5).
enable_tls: false # Whether to connect using TLS (default: false).
allow_insecure_tls: false # Whether to allow insecure / invalid TLS certificates (default: false).
Timeout settings
Timeout Settings
----------------

The unit for all values is *seconds*.
Expand All @@ -116,6 +121,18 @@ The unit for all values is *seconds*.

-- `Query Timeouts - Using Context Cancellation`_

Connection Pool Settings
------------------------

The service uses two connection pools for communicating to the database, one of each
for read vs. write operations. The configuration settings ``max_connections``,
``read_pool_size_max``, and ``write_pool_size_max`` determine the maximum
connection pool sizes, either for both pools at once, or individually.

By default, when not configured otherwise, by either omitting the settings altogether,
or using ``0`` values, ``pgx`` configures the maximum pool size using the number of CPU
cores available to the system it is running on, by calling ``runtime.NumCPU()``.


Prometheus configuration
========================
Expand Down
9 changes: 7 additions & 2 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ cratedb_endpoints:
user: "crate" # Username to use (default: "crate")
password: "" # Password to use (default: "").
schema: "" # Schema to use (default: "").
max_connections: 5 # The maximum number of concurrent connections (default: 5).
max_connections: 0 # The maximum number of concurrent connections (default: runtime.NumCPU()).
# It will get forwarded to pgx's `pool_max_conns`, and determines
# the maximum number of connections in the connection pool.
# the maximum number of connections in the connection pool for
# both connection pools (read and write).
read_pool_size_max: 0 # Configure the maximum pool size for read operations individually.
# (default: runtime.NumCPU())
write_pool_size_max: 0 # Configure the maximum pool size for write operations individually.
# (default: runtime.NumCPU())
connect_timeout: 10 # TCP connect timeout (seconds) (default: 10).
# It has the same meaning as libpq's `connect_timeout`.
read_timeout: 5 # Query context timeout for read queries (seconds) (default: 5).
Expand Down
83 changes: 63 additions & 20 deletions crate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ type crateReadResponse struct {
}

type crateEndpoint struct {
pool *pgxpool.Pool
poolConf *pgxpool.Config
readTimeout time.Duration
writeTimeout time.Duration
poolConf *pgxpool.Config
readPoolSize int
writePoolSize int
readTimeout time.Duration
writeTimeout time.Duration
readPool *pgxpool.Pool
writePool *pgxpool.Pool
}

func newCrateEndpoint(ep *endpointConfig) *crateEndpoint {
Expand All @@ -64,8 +67,11 @@ func newCrateEndpoint(ep *endpointConfig) *crateEndpoint {
// # Example URL
// postgres://jack:[email protected]:5432/mydb?sslmode=verify-ca
connectionString := fmt.Sprintf(
"postgres://%s:%s@%s:%v/%s?connect_timeout=%v&pool_max_conns=%v",
ep.User, ep.Password, ep.Host, ep.Port, ep.Schema, ep.ConnectTimeout, ep.MaxConnections)
"postgres://%s:%s@%s:%v/%s?connect_timeout=%v",
ep.User, ep.Password, ep.Host, ep.Port, ep.Schema, ep.ConnectTimeout)
if ep.MaxConnections != 0 {
connectionString += fmt.Sprintf("&pool_max_conns=%v", ep.MaxConnections)
}
poolConf, err := pgxpool.ParseConfig(connectionString)
if err != nil {
return nil
Expand All @@ -92,24 +98,25 @@ func newCrateEndpoint(ep *endpointConfig) *crateEndpoint {
return err
}
return &crateEndpoint{
poolConf: poolConf,
readTimeout: time.Duration(ep.ReadTimeout) * time.Second,
writeTimeout: time.Duration(ep.WriteTimeout) * time.Second,
poolConf: poolConf,
readPoolSize: ep.ReadPoolSize,
writePoolSize: ep.WritePoolSize,
readTimeout: time.Duration(ep.ReadTimeout) * time.Second,
writeTimeout: time.Duration(ep.WriteTimeout) * time.Second,
}
}

func (c *crateEndpoint) endpoint() endpoint.Endpoint {
/**
* Initialize connection pools lazily here instead of in `newCrateEndpoint()`,
* so that the adapter does not crash on startup if the endpoint is unavailable.
**/
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
// We initialize the connection pool lazily here instead of in newCrateEndpoint() so
// that the adapter does not crash on startup if an endpoint is unavailable.
if c.pool == nil {
pool, err := pgxpool.NewWithConfig(ctx, c.poolConf)
if err != nil {
return nil, fmt.Errorf("error opening connection to CrateDB: %v", err)
}
c.pool = pool
}

// Initialize database connection pools.
err = c.createPools(ctx)

// Dispatch by request type.
switch r := request.(type) {
case *crateWriteRequest:
return nil, c.write(ctx, r)
Expand All @@ -121,6 +128,42 @@ func (c *crateEndpoint) endpoint() endpoint.Endpoint {
}
}

func (c *crateEndpoint) createPools(ctx context.Context) (err error) {
/**
* Initialize two connection pools, one for read/write each.
**/
c.readPool, err = createPoolWithPoolSize(ctx, c.poolConf.Copy(), c.readPoolSize)
if c.readPool == nil {
c.readPool, err = createPoolWithPoolSize(ctx, c.poolConf.Copy(), c.readPoolSize)
if err != nil {
return err
}
}
if c.writePool == nil {
c.writePool, err = createPoolWithPoolSize(ctx, c.poolConf.Copy(), c.writePoolSize)
if err != nil {
return err
}
}
return nil
}

func createPool(ctx context.Context, poolConf *pgxpool.Config) (pool *pgxpool.Pool, err error) {
pool, err = pgxpool.NewWithConfig(ctx, poolConf)
if err != nil {
return nil, fmt.Errorf("error opening connection to CrateDB: %v", err)
} else {
return pool, nil
}
}

func createPoolWithPoolSize(ctx context.Context, poolConf *pgxpool.Config, maxConns int) (pool *pgxpool.Pool, err error) {
if maxConns != 0 {
poolConf.MaxConns = int32(maxConns)
}
return createPool(ctx, poolConf)
}

func (c crateEndpoint) write(ctx context.Context, r *crateWriteRequest) error {
batch := &pgx.Batch{}
for _, a := range r.rows {
Expand Down Expand Up @@ -179,7 +222,7 @@ func (c crateEndpoint) write(ctx context.Context, r *crateWriteRequest) error {
//
ctx, _ = context.WithTimeout(ctx, c.writeTimeout)

batchResults := c.pool.SendBatch(ctx, batch)
batchResults := c.writePool.SendBatch(ctx, batch)
var qerr error
if qerr != nil {
return fmt.Errorf("error executing write batch: %v", qerr)
Expand All @@ -196,7 +239,7 @@ func (c crateEndpoint) read(ctx context.Context, r *crateReadRequest) (*crateRea
// pgx4 implements query timeouts using context cancellation.
// See `write` function for more details.
ctx, _ = context.WithTimeout(ctx, c.readTimeout)
rows, err := c.pool.Query(ctx, r.stmt, nil)
rows, err := c.readPool.Query(ctx, r.stmt, nil)
if err != nil {
return nil, fmt.Errorf("error executing read request query: %v", err)
}
Expand Down
142 changes: 141 additions & 1 deletion crate_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,154 @@
package main

import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/require"
"runtime"
"testing"
)

var CPU_COUNT = int32(runtime.NumCPU())

func TestNewCrateEndpoint(t *testing.T) {
conf := builtinConfig()
endpoint := newCrateEndpoint(&conf.Endpoints[0])
require.Equal(t,
"postgres://crate:@localhost:5432/?connect_timeout=10",
endpoint.poolConf.ConnString(),
)
require.GreaterOrEqual(t, endpoint.poolConf.MaxConns, CPU_COUNT)
}

func TestConfigurePoolDefault(t *testing.T) {
/**
* Verify that, by default, the pool size got configured with one connection per core by default.
**/
poolConf, _ := pgxpool.ParseConfig("postgres://crate:foo@localhost:5432/")
require.Equal(t, "localhost", poolConf.ConnConfig.Host)
require.GreaterOrEqual(t, poolConf.MaxConns, CPU_COUNT)

ctx := context.Background()
pool, err := createPool(ctx, poolConf)
require.IsType(t, &pgxpool.Pool{}, pool)
require.Equal(t, err, nil)
require.GreaterOrEqual(t, pool.Config().MaxConns, CPU_COUNT)
}

func TestConfigurePoolWithPoolSizeFromConnectionString(t *testing.T) {
/**
* Verify that the pool size gets obtained from the database connection string.
**/
poolConf, _ := pgxpool.ParseConfig("postgres://crate:foo@localhost:5432/?pool_max_conns=42")
require.Equal(t, "localhost", poolConf.ConnConfig.Host)
require.Equal(t, int32(42), poolConf.MaxConns)

ctx := context.Background()
pool, err := createPool(ctx, poolConf)
require.IsType(t, &pgxpool.Pool{}, pool)
require.Equal(t, err, nil)
require.Equal(t, int32(42), pool.Config().MaxConns)
}

func TestConfigurePoolWithPoolSizeFromSettingsVanilla(t *testing.T) {
/**
* Verify that the pool size can be configured using a configuration setting.
**/
poolConf, _ := pgxpool.ParseConfig("postgres://crate:foo@localhost:5432/")
require.Equal(t, "localhost", poolConf.ConnConfig.Host)
require.GreaterOrEqual(t, poolConf.MaxConns, CPU_COUNT)

ctx := context.Background()
pool, err := createPoolWithPoolSize(ctx, poolConf, 42)
require.IsType(t, &pgxpool.Pool{}, pool)
require.Equal(t, err, nil)
require.Equal(t, int32(42), pool.Config().MaxConns)
}

func TestConfigurePoolWithPoolSizeFromSettingsPrecedence(t *testing.T) {
/**
* Verify that the pool size configuration setting takes precedence over the connection string.
**/
poolConf, _ := pgxpool.ParseConfig("postgres://crate:foo@localhost:5432/?pool_max_conns=33")
require.Equal(t, "localhost", poolConf.ConnConfig.Host)
require.Equal(t, int32(33), poolConf.MaxConns)

ctx := context.Background()
pool, err := createPoolWithPoolSize(ctx, poolConf, 42)
require.IsType(t, &pgxpool.Pool{}, pool)
require.Equal(t, err, nil)
require.Equal(t, int32(42), pool.Config().MaxConns)
}

func TestPoolsDefault(t *testing.T) {
/**
* Verify connection pool sizes when not configured explicitly.
**/
conf := builtinConfig()
endpoint := newCrateEndpoint(&conf.Endpoints[0])
ctx := context.Background()
endpoint.createPools(ctx)
require.IsType(t, &pgxpool.Pool{}, endpoint.readPool)
require.Equal(t,
"postgres://crate:@localhost:5432/?connect_timeout=10",
endpoint.poolConf.ConnString(),
)
require.GreaterOrEqual(t, endpoint.readPool.Config().MaxConns, CPU_COUNT)
require.GreaterOrEqual(t, endpoint.writePool.Config().MaxConns, CPU_COUNT)
}

func TestPoolsWithMaxConnections(t *testing.T) {
/**
* Verify connection pool sizes when configured using `MaxConnections`.
**/
conf := builtinConfig()
conf.Endpoints[0].MaxConnections = 42
endpoint := newCrateEndpoint(&conf.Endpoints[0])
ctx := context.Background()
endpoint.createPools(ctx)
require.IsType(t, &pgxpool.Pool{}, endpoint.readPool)
require.Equal(t,
"postgres://crate:@localhost:5432/?connect_timeout=10&pool_max_conns=42",
endpoint.poolConf.ConnString(),
)
require.Equal(t, int32(42), endpoint.readPool.Config().MaxConns)
require.Equal(t, int32(42), endpoint.writePool.Config().MaxConns)
}

func TestPoolsWithIndividualPoolSizes(t *testing.T) {
/**
* Verify connection pool sizes when configured using `ReadPoolSize` and `WritePoolSize`.
**/
conf := builtinConfig()
conf.Endpoints[0].ReadPoolSize = 11
conf.Endpoints[0].WritePoolSize = 22
endpoint := newCrateEndpoint(&conf.Endpoints[0])
ctx := context.Background()
endpoint.createPools(ctx)
require.IsType(t, &pgxpool.Pool{}, endpoint.readPool)
require.Equal(t,
"postgres://crate:@localhost:5432/?connect_timeout=10",
endpoint.poolConf.ConnString(),
)
require.Equal(t, int32(11), endpoint.readPool.Config().MaxConns)
require.Equal(t, int32(22), endpoint.writePool.Config().MaxConns)
}

func TestPoolsWithMaxConnectionsAndIndividualPoolSizes(t *testing.T) {
/**
* Verify connection pool sizes when configured using `MaxConnections` and `ReadPoolSize`.
**/
conf := builtinConfig()
conf.Endpoints[0].MaxConnections = 5
conf.Endpoints[0].ReadPoolSize = 40
endpoint := newCrateEndpoint(&conf.Endpoints[0])
ctx := context.Background()
endpoint.createPools(ctx)
require.IsType(t, &pgxpool.Pool{}, endpoint.readPool)
require.Equal(t,
"postgres://crate:@localhost:5432/?connect_timeout=10&pool_max_conns=5",
endpoint.poolConf.ConnString(),
"postgres://crate:@localhost:5432/?connect_timeout=10&pool_max_conns=5")
)
require.Equal(t, int32(40), endpoint.readPool.Config().MaxConns)
require.Equal(t, int32(5), endpoint.writePool.Config().MaxConns)
}
2 changes: 2 additions & 0 deletions fixtures/config_good.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ cratedb_endpoints:
password: "password1"
schema: "schema1"
max_connections: 10
read_pool_size_max: 20
write_pool_size_max: 5
connect_timeout: 30
read_timeout: 60
write_timeout: 30
Expand Down
7 changes: 3 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,9 @@ type endpointConfig struct {
User string `yaml:"user"`
Password string `yaml:"password"`
Schema string `yaml:"schema"`
MaxConnections int32 `yaml:"max_connections"`
MaxConnections int `yaml:"max_connections"`
ReadPoolSize int `yaml:"read_pool_size_max"`
WritePoolSize int `yaml:"write_pool_size_max"`
ConnectTimeout int `yaml:"connect_timeout"`
ReadTimeout int `yaml:"read_timeout"`
WriteTimeout int `yaml:"write_timeout"`
Expand Down Expand Up @@ -402,9 +404,6 @@ func loadConfig(filename string) (*config, error) {
if conf.Endpoints[i].User == "" {
conf.Endpoints[i].User = "crate"
}
if conf.Endpoints[i].MaxConnections == 0 {
conf.Endpoints[i].MaxConnections = 5
}
if conf.Endpoints[i].ConnectTimeout == 0 {
conf.Endpoints[i].ConnectTimeout = 10
}
Expand Down
Loading

0 comments on commit b8f4119

Please sign in to comment.