Skip to content

Commit

Permalink
Add support for Cassandra reconnect interval (#934)
Browse files Browse the repository at this point in the history
* Make cassandra reconnect down hosts.

* Fix #767 by enabling gocql setting `ReconnectInterval` to reconnect to
down Cassandra hosts at a regular interval.

Signed-off-by: Brendan Shaklovitz <[email protected]>

* Add cassandra `ReconnectInterval` test.

Signed-off-by: Brendan Shaklovitz <[email protected]>
  • Loading branch information
nyanshak authored and yurishkuro committed Jul 14, 2018
1 parent a0dc40e commit 7919cd9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
5 changes: 5 additions & 0 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Configuration struct {
Keyspace string `validate:"nonzero"`
ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host"`
Timeout time.Duration `validate:"min=500"`
ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval"`
SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive"`
MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt"`
ProtoVersion int `yaml:"proto_version"`
Expand Down Expand Up @@ -74,6 +75,9 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.Timeout == 0 {
c.Timeout = source.Timeout
}
if c.ReconnectInterval == 0 {
c.ReconnectInterval = source.ReconnectInterval
}
if c.Port == 0 {
c.Port = source.Port
}
Expand Down Expand Up @@ -109,6 +113,7 @@ func (c *Configuration) NewCluster() *gocql.ClusterConfig {
cluster.Keyspace = c.Keyspace
cluster.NumConns = c.ConnectionsPerHost
cluster.Timeout = c.Timeout
cluster.ReconnectInterval = c.ReconnectInterval
cluster.SocketKeepalive = c.SocketKeepAlive
if c.ProtoVersion > 0 {
cluster.ProtoVersion = c.ProtoVersion
Expand Down
43 changes: 25 additions & 18 deletions plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,25 @@ import (

const (
// session settings
suffixEnabled = ".enabled"
suffixConnPerHost = ".connections-per-host"
suffixMaxRetryAttempts = ".max-retry-attempts"
suffixTimeout = ".timeout"
suffixServers = ".servers"
suffixPort = ".port"
suffixKeyspace = ".keyspace"
suffixConsistency = ".consistency"
suffixProtoVer = ".proto-version"
suffixSocketKeepAlive = ".socket-keep-alive"
suffixUsername = ".username"
suffixPassword = ".password"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"
suffixServerName = ".tls.server-name"
suffixVerifyHost = ".tls.verify-host"
suffixEnabled = ".enabled"
suffixConnPerHost = ".connections-per-host"
suffixMaxRetryAttempts = ".max-retry-attempts"
suffixTimeout = ".timeout"
suffixReconnectInterval = ".reconnect-interval"
suffixServers = ".servers"
suffixPort = ".port"
suffixKeyspace = ".keyspace"
suffixConsistency = ".consistency"
suffixProtoVer = ".proto-version"
suffixSocketKeepAlive = ".socket-keep-alive"
suffixUsername = ".username"
suffixPassword = ".password"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"
suffixServerName = ".tls.server-name"
suffixVerifyHost = ".tls.verify-host"

// common storage settings
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl"
Expand Down Expand Up @@ -83,6 +84,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
Keyspace: "jaeger_v1_test",
ProtoVersion: 4,
ConnectionsPerHost: 2,
ReconnectInterval: 60 * time.Second,
},
servers: "127.0.0.1",
namespace: primaryNamespace,
Expand Down Expand Up @@ -130,6 +132,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixTimeout,
nsConfig.Timeout,
"Timeout used for queries")
flagSet.Duration(
nsConfig.namespace+suffixReconnectInterval,
nsConfig.ReconnectInterval,
"Reconnect interval to retry connecting to downed hosts")
flagSet.String(
nsConfig.namespace+suffixServers,
nsConfig.servers,
Expand Down Expand Up @@ -204,6 +210,7 @@ func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
cfg.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost)
cfg.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
cfg.ReconnectInterval = v.GetDuration(cfg.namespace + suffixReconnectInterval)
cfg.servers = v.GetString(cfg.namespace + suffixServers)
cfg.Port = v.GetInt(cfg.namespace + suffixPort)
cfg.Keyspace = v.GetString(cfg.namespace + suffixKeyspace)
Expand Down
3 changes: 3 additions & 0 deletions plugin/storage/cassandra/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestOptions(t *testing.T) {
assert.Equal(t, primary.Keyspace, aux.Keyspace)
assert.Equal(t, primary.Servers, aux.Servers)
assert.Equal(t, primary.ConnectionsPerHost, aux.ConnectionsPerHost)
assert.Equal(t, primary.ReconnectInterval, aux.ReconnectInterval)
}

func TestOptionsWithFlags(t *testing.T) {
Expand All @@ -50,6 +51,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--cas.keyspace=jaeger",
"--cas.servers=1.1.1.1,2.2.2.2",
"--cas.connections-per-host=42",
"--cas.reconnect-interval=42s",
"--cas.max-retry-attempts=42",
"--cas.timeout=42s",
"--cas.port=4242",
Expand All @@ -75,6 +77,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, 42, aux.ConnectionsPerHost)
assert.Equal(t, 42, aux.MaxRetryAttempts)
assert.Equal(t, 42*time.Second, aux.Timeout)
assert.Equal(t, 42*time.Second, aux.ReconnectInterval)
assert.Equal(t, 4242, aux.Port)
assert.Equal(t, "", aux.Consistency, "aux storage does not inherit consistency from primary")
assert.Equal(t, 3, aux.ProtoVersion)
Expand Down

0 comments on commit 7919cd9

Please sign in to comment.