Skip to content

Commit

Permalink
Allow users to configure the number of TCP connections per host for C…
Browse files Browse the repository at this point in the history
…assandra. (grafana#2666)

* Allow users to configure the number of TCP connections per host for Cassandra.

Signed-off-by: Tom Wilkie <tom@grafana.com>

* Update docs.

Signed-off-by: Tom Wilkie <tom@grafana.com>

* Use flagext.DefaultValues in Cassandra fixtures.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>

* Use separate sessions for reads and write in Cassandra.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>

* Add changelog entries.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
  • Loading branch information
tomwilkie authored Jun 2, 2020
1 parent 269a004 commit 5efba7f
Showing 2 changed files with 32 additions and 20 deletions.
13 changes: 7 additions & 6 deletions cassandra/fixtures.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/testutils"
"github.com/cortexproject/cortex/pkg/util/flagext"
)

// GOCQL doesn't provide nice mocks, so we use a real Cassandra instance.
@@ -40,12 +41,12 @@ func Fixtures() ([]testutils.Fixture, error) {
return nil, nil
}

cfg := Config{
Addresses: addresses,
Keyspace: "test",
Consistency: "QUORUM",
ReplicationFactor: 1,
}
var cfg Config
flagext.DefaultValues(&cfg)
cfg.Addresses = addresses
cfg.Keyspace = "test"
cfg.Consistency = "QUORUM"
cfg.ReplicationFactor = 1

// Get a SchemaConfig with the defaults.
schemaConfig := testutils.DefaultSchemaConfig("cassandra")
39 changes: 25 additions & 14 deletions cassandra/storage_client.go
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ type Config struct {
MaxBackoff time.Duration `yaml:"retry_max_backoff"`
MinBackoff time.Duration `yaml:"retry_min_backoff"`
QueryConcurrency int `yaml:"query_concurrency"`
NumConnections int `yaml:"num_connections"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -66,6 +67,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MinBackoff, "cassandra.retry-min-backoff", 100*time.Millisecond, "Minimum time to wait before retrying a failed request. (Default = 100ms)")
f.DurationVar(&cfg.MaxBackoff, "cassandra.retry-max-backoff", 10*time.Second, "Maximum time to wait before retrying a failed request. (Default = 10s)")
f.IntVar(&cfg.QueryConcurrency, "cassandra.query-concurrency", 0, "Limit number of concurrent queries to Cassandra. (Default is 0: no limit)")
f.IntVar(&cfg.NumConnections, "cassandra.num-connections", 2, "Number of TCP connections per host.")
}

func (cfg *Config) Validate() error {
@@ -92,6 +94,7 @@ func (cfg *Config) session() (*gocql.Session, error) {
cluster.QueryObserver = observer{}
cluster.Timeout = cfg.Timeout
cluster.ConnectTimeout = cfg.ConnectTimeout
cluster.NumConns = cfg.NumConnections
if cfg.Retries > 0 {
cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{
NumRetries: cfg.Retries,
@@ -197,15 +200,21 @@ func (cfg *Config) createKeyspace() error {
type StorageClient struct {
cfg Config
schemaCfg chunk.SchemaConfig
session *gocql.Session
readSession *gocql.Session
writeSession *gocql.Session
querySemaphore *semaphore.Weighted
}

// NewStorageClient returns a new StorageClient.
func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (*StorageClient, error) {
pkgutil.WarnExperimentalUse("Cassandra Backend")

session, err := cfg.session()
readSession, err := cfg.session()
if err != nil {
return nil, errors.WithStack(err)
}

writeSession, err := cfg.session()
if err != nil {
return nil, errors.WithStack(err)
}
@@ -218,15 +227,17 @@ func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (*StorageClient,
client := &StorageClient{
cfg: cfg,
schemaCfg: schemaCfg,
session: session,
readSession: readSession,
writeSession: writeSession,
querySemaphore: querySemaphore,
}
return client, nil
}

// Stop implement chunk.IndexClient.
func (s *StorageClient) Stop() {
s.session.Close()
s.readSession.Close()
s.writeSession.Close()
}

// Cassandra batching isn't really useful in this case, its more to do multiple
@@ -263,15 +274,15 @@ func (s *StorageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch)
b := batch.(*writeBatch)

for _, entry := range b.entries {
err := s.session.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, ?, ?)",
err := s.writeSession.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, ?, ?)",
entry.TableName), entry.HashValue, entry.RangeValue, entry.Value).WithContext(ctx).Exec()
if err != nil {
return errors.WithStack(err)
}
}

for _, entry := range b.deletes {
err := s.session.Query(fmt.Sprintf("DELETE FROM %s WHERE hash = ? and range = ?",
err := s.writeSession.Query(fmt.Sprintf("DELETE FROM %s WHERE hash = ? and range = ?",
entry.TableName), entry.HashValue, entry.RangeValue).WithContext(ctx).Exec()
if err != nil {
return errors.WithStack(err)
@@ -298,27 +309,27 @@ func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callb

switch {
case len(query.RangeValuePrefix) > 0 && query.ValueEqual == nil:
q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND range < ?",
q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND range < ?",
query.TableName), query.HashValue, query.RangeValuePrefix, append(query.RangeValuePrefix, '\xff'))

case len(query.RangeValuePrefix) > 0 && query.ValueEqual != nil:
q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND range < ? AND value = ? ALLOW FILTERING",
q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND range < ? AND value = ? ALLOW FILTERING",
query.TableName), query.HashValue, query.RangeValuePrefix, append(query.RangeValuePrefix, '\xff'), query.ValueEqual)

case len(query.RangeValueStart) > 0 && query.ValueEqual == nil:
q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ?",
q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ?",
query.TableName), query.HashValue, query.RangeValueStart)

case len(query.RangeValueStart) > 0 && query.ValueEqual != nil:
q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND value = ? ALLOW FILTERING",
q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND range >= ? AND value = ? ALLOW FILTERING",
query.TableName), query.HashValue, query.RangeValueStart, query.ValueEqual)

case query.ValueEqual == nil:
q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ?",
q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ?",
query.TableName), query.HashValue)

case query.ValueEqual != nil:
q = s.session.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND value = ? ALLOW FILTERING",
q = s.readSession.Query(fmt.Sprintf("SELECT range, value FROM %s WHERE hash = ? AND value = ? ALLOW FILTERING",
query.TableName), query.HashValue, query.ValueEqual)
}

@@ -384,7 +395,7 @@ func (s *StorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err
}

// Must provide a range key, even though its not useds - hence 0x00.
q := s.session.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, 0x00, ?)",
q := s.writeSession.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, 0x00, ?)",
tableName), key, buf)
if err := q.WithContext(ctx).Exec(); err != nil {
return errors.WithStack(err)
@@ -413,7 +424,7 @@ func (s *StorageClient) getChunk(ctx context.Context, decodeContext *chunk.Decod
}

var buf []byte
if err := s.session.Query(fmt.Sprintf("SELECT value FROM %s WHERE hash = ?", tableName), input.ExternalKey()).
if err := s.readSession.Query(fmt.Sprintf("SELECT value FROM %s WHERE hash = ?", tableName), input.ExternalKey()).
WithContext(ctx).Scan(&buf); err != nil {
return input, errors.WithStack(err)
}

0 comments on commit 5efba7f

Please sign in to comment.