From 705c12cc34bc333913acc78622ec58c8f5a9d30f Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Fri, 6 Sep 2024 18:50:33 -0400 Subject: [PATCH 01/18] Use OTEL TLS Config For Cassandra Signed-off-by: Mahad Zaryab --- pkg/cassandra/config/config.go | 52 +++++++++---------- plugin/storage/cassandra/factory.go | 11 ++-- plugin/storage/cassandra/factory_test.go | 2 +- plugin/storage/cassandra/options.go | 3 +- plugin/storage/cassandra/options_test.go | 2 +- .../storage/cassandra/savetracetest/main.go | 2 +- 6 files changed, 32 insertions(+), 40 deletions(-) diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 80f51db77e8..e19a14f4b78 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -5,36 +5,36 @@ package config import ( + "context" "fmt" "time" "github.com/asaskevich/govalidator" "github.com/gocql/gocql" - "go.uber.org/zap" + "go.opentelemetry.io/collector/config/configtls" "github.com/jaegertracing/jaeger/pkg/cassandra" gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" ) // Configuration describes the configuration properties needed to connect to a Cassandra cluster type Configuration struct { - Servers []string `valid:"required,url" mapstructure:"servers"` - Keyspace string `mapstructure:"keyspace"` - LocalDC string `mapstructure:"local_dc"` - ConnectionsPerHost int `mapstructure:"connections_per_host"` - Timeout time.Duration `mapstructure:"-"` - ConnectTimeout time.Duration `mapstructure:"connection_timeout"` - ReconnectInterval time.Duration `mapstructure:"reconnect_interval"` - SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"` - MaxRetryAttempts int `mapstructure:"max_retry_attempts"` - ProtoVersion int `mapstructure:"proto_version"` - Consistency string `mapstructure:"consistency"` - DisableCompression bool `mapstructure:"disable_compression"` - Port int `mapstructure:"port"` - Authenticator Authenticator `mapstructure:",squash"` - DisableAutoDiscovery bool `mapstructure:"-"` - TLS tlscfg.Options `mapstructure:"tls"` + Servers []string `valid:"required,url" mapstructure:"servers"` + Keyspace string `mapstructure:"keyspace"` + LocalDC string `mapstructure:"local_dc"` + ConnectionsPerHost int `mapstructure:"connections_per_host"` + Timeout time.Duration `mapstructure:"-"` + ConnectTimeout time.Duration `mapstructure:"connection_timeout"` + ReconnectInterval time.Duration `mapstructure:"reconnect_interval"` + SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"` + MaxRetryAttempts int `mapstructure:"max_retry_attempts"` + ProtoVersion int `mapstructure:"proto_version"` + Consistency string `mapstructure:"consistency"` + DisableCompression bool `mapstructure:"disable_compression"` + Port int `mapstructure:"port"` + Authenticator Authenticator `mapstructure:",squash"` + DisableAutoDiscovery bool `mapstructure:"-"` + TLS configtls.ClientConfig `mapstructure:"tls"` } func DefaultConfiguration() Configuration { @@ -92,12 +92,12 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { // SessionBuilder creates new cassandra.Session type SessionBuilder interface { - NewSession(logger *zap.Logger) (cassandra.Session, error) + NewSession() (cassandra.Session, error) } // NewSession creates a new Cassandra session -func (c *Configuration) NewSession(logger *zap.Logger) (cassandra.Session, error) { - cluster, err := c.NewCluster(logger) +func (c *Configuration) NewSession() (cassandra.Session, error) { + cluster, err := c.NewCluster() if err != nil { return nil, err } @@ -109,7 +109,7 @@ func (c *Configuration) NewSession(logger *zap.Logger) (cassandra.Session, error } // NewCluster creates a new gocql cluster from the configuration -func (c *Configuration) NewCluster(logger *zap.Logger) (*gocql.ClusterConfig, error) { +func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) { cluster := gocql.NewCluster(c.Servers...) cluster.Keyspace = c.Keyspace cluster.NumConns = c.ConnectionsPerHost @@ -150,11 +150,11 @@ func (c *Configuration) NewCluster(logger *zap.Logger) (*gocql.ClusterConfig, er AllowedAuthenticators: c.Authenticator.Basic.AllowedAuthenticators, } } - tlsCfg, err := c.TLS.Config(logger) + tlsCfg, err := c.TLS.LoadTLSConfig(context.Background()) if err != nil { return nil, err } - if c.TLS.Enabled { + if !c.TLS.Insecure { cluster.SslOpts = &gocql.SslOptions{ Config: tlsCfg, } @@ -167,10 +167,6 @@ func (c *Configuration) NewCluster(logger *zap.Logger) (*gocql.ClusterConfig, er return cluster, nil } -func (c *Configuration) Close() error { - return c.TLS.Close() -} - func (c *Configuration) String() string { return fmt.Sprintf("%+v", *c) } diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 8dbc5c90fe2..e26c215bac7 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -137,14 +137,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil}) f.logger = logger - primarySession, err := f.primaryConfig.NewSession(logger) + primarySession, err := f.primaryConfig.NewSession() if err != nil { return err } f.primarySession = primarySession if f.archiveConfig != nil { - archiveSession, err := f.archiveConfig.NewSession(logger) + archiveSession, err := f.archiveConfig.NewSession() if err != nil { return err } @@ -251,12 +251,7 @@ func (f *Factory) Close() error { f.archiveSession.Close() } - var errs []error - if cfg := f.Options.Get(archiveStorageConfig); cfg != nil { - errs = append(errs, cfg.TLS.Close()) - } - errs = append(errs, f.Options.GetPrimary().TLS.Close()) - return errors.Join(errs...) + return nil } func (f *Factory) Purge(_ context.Context) error { diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index a97a1abc985..4f92fe7181a 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -34,7 +34,7 @@ func newMockSessionBuilder(session *mocks.Session, err error) *mockSessionBuilde } } -func (m *mockSessionBuilder) NewSession(*zap.Logger) (cassandra.Session, error) { +func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) { return m.session, m.err } diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index 1a8e338c8d9..ff7792bf459 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -247,11 +247,12 @@ func (cfg *NamespaceConfig) initFromViper(v *viper.Viper) { cfg.Authenticator.Basic.AllowedAuthenticators = strings.Split(authentication, ",") cfg.DisableCompression = v.GetBool(cfg.namespace + suffixDisableCompression) var err error - cfg.TLS, err = tlsFlagsConfig.InitFromViper(v) + tlsCfg, err := tlsFlagsConfig.InitFromViper(v) if err != nil { // TODO refactor to be able to return error log.Fatal(err) } + cfg.TLS = tlsCfg.ToOtelClientConfig() } // GetPrimary returns primary configuration. diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index b2a00245169..e405f7abc07 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -102,7 +102,7 @@ func TestDefaultTlsHostVerify(t *testing.T) { opts.InitFromViper(v) primary := opts.GetPrimary() - assert.False(t, primary.TLS.SkipHostVerify) + assert.False(t, primary.TLS.InsecureSkipVerify) } func TestEmptyBlackWhiteLists(t *testing.T) { diff --git a/plugin/storage/cassandra/savetracetest/main.go b/plugin/storage/cassandra/savetracetest/main.go index 632d9c765e9..c510202ae01 100644 --- a/plugin/storage/cassandra/savetracetest/main.go +++ b/plugin/storage/cassandra/savetracetest/main.go @@ -30,7 +30,7 @@ func main() { ProtoVersion: 4, Keyspace: "jaeger_v1_test", } - cqlSession, err := cConfig.NewSession(logger) + cqlSession, err := cConfig.NewSession() if err != nil { logger.Fatal("Cannot create Cassandra session", zap.Error(err)) } From 953a8aeb8926a0b6d804c795db3ec60f3665b8dc Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Fri, 6 Sep 2024 19:34:04 -0400 Subject: [PATCH 02/18] Set Insecure Field In Cassandra Docker Compose Configuration Signed-off-by: Mahad Zaryab --- cmd/jaeger/config-cassandra.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 893b3a1800f..c5897f7f039 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -23,11 +23,15 @@ extensions: keyspace: "jaeger_v1_dc1" username: "cassandra" password: "cassandra" + tls: + insecure: true another_storage: cassandra: keyspace: "jaeger_v1_dc1" username: "cassandra" password: "cassandra" + tls: + insecure: true receivers: otlp: protocols: From 7fba7a037e17682a2f4b6e08121bf6c6b78917fb Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 9 Sep 2024 19:33:07 -0400 Subject: [PATCH 03/18] Group Configurations Into Connection And Schema Signed-off-by: Mahad Zaryab --- .../extension/jaegerstorage/config_test.go | 2 +- pkg/cassandra/config/config.go | 135 ++++++++++-------- plugin/storage/cassandra/factory_test.go | 8 +- plugin/storage/cassandra/options.go | 66 ++++----- plugin/storage/cassandra/options_test.go | 48 +++---- .../storage/cassandra/savetracetest/main.go | 14 +- 6 files changed, 147 insertions(+), 126 deletions(-) diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config_test.go b/cmd/jaeger/internal/extension/jaegerstorage/config_test.go index ca0b8735cdd..58a000051bd 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config_test.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config_test.go @@ -83,7 +83,7 @@ backends: `) cfg := createDefaultConfig().(*Config) require.NoError(t, conf.Unmarshal(cfg)) - assert.NotEmpty(t, cfg.Backends["some_storage"].Cassandra.Primary.Servers) + assert.NotEmpty(t, cfg.Backends["some_storage"].Cassandra.Primary.Connection.Servers) } func TestConfigDefaultElasticsearch(t *testing.T) { diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index e19a14f4b78..a47f50b3f3e 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -17,36 +17,32 @@ import ( gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql" ) -// Configuration describes the configuration properties needed to connect to a Cassandra cluster +// Configuration describes the configuration properties needed to connect to a Cassandra cluster. type Configuration struct { - Servers []string `valid:"required,url" mapstructure:"servers"` - Keyspace string `mapstructure:"keyspace"` + Connection Connection `mapstructure:"connection"` + Schema Schema `mapstructure:"schema"` +} + +type Connection struct { + Servers []string `mapstructure:"servers" valid:"required,url" ` LocalDC string `mapstructure:"local_dc"` + Port int `mapstructure:"port"` + DisableAutoDiscovery bool `mapstructure:"disable_auto_discovery"` ConnectionsPerHost int `mapstructure:"connections_per_host"` - Timeout time.Duration `mapstructure:"-"` - ConnectTimeout time.Duration `mapstructure:"connection_timeout"` ReconnectInterval time.Duration `mapstructure:"reconnect_interval"` SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"` MaxRetryAttempts int `mapstructure:"max_retry_attempts"` + TLS configtls.ClientConfig `mapstructure:"tls"` + QueryTimeout time.Duration `mapstructure:"query_timeout"` + ConnectTimeout time.Duration `mapstructure:"connection_timeout"` + Authenticator Authenticator `mapstructure:"auth"` ProtoVersion int `mapstructure:"proto_version"` Consistency string `mapstructure:"consistency"` - DisableCompression bool `mapstructure:"disable_compression"` - Port int `mapstructure:"port"` - Authenticator Authenticator `mapstructure:",squash"` - DisableAutoDiscovery bool `mapstructure:"-"` - TLS configtls.ClientConfig `mapstructure:"tls"` } -func DefaultConfiguration() Configuration { - return Configuration{ - Servers: []string{"127.0.0.1"}, - Port: 9042, - MaxRetryAttempts: 3, - Keyspace: "jaeger_v1_test", - ProtoVersion: 4, - ConnectionsPerHost: 2, - ReconnectInterval: 60 * time.Second, - } +type Schema struct { + Keyspace string `mapstructure:"keyspace"` + DisableCompression bool `mapstructure:"disable_compression"` } // Authenticator holds the authentication properties needed to connect to a Cassandra cluster @@ -62,31 +58,48 @@ type BasicAuthenticator struct { AllowedAuthenticators []string `yaml:"allowed_authenticators" mapstructure:"allowed_authenticators"` } +func DefaultConfiguration() Configuration { + return Configuration{ + Connection: Connection{ + Servers: []string{"127.0.0.1"}, + Port: 9042, + MaxRetryAttempts: 3, + ProtoVersion: 4, + ConnectionsPerHost: 2, + ReconnectInterval: 60 * time.Second, + }, + Schema: Schema{ + Keyspace: "jaeger_v1_test", + }, + } +} + // ApplyDefaults copies settings from source unless its own value is non-zero. func (c *Configuration) ApplyDefaults(source *Configuration) { - if c.ConnectionsPerHost == 0 { - c.ConnectionsPerHost = source.ConnectionsPerHost + if c.Connection.ConnectionsPerHost == 0 { + c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost } - if c.MaxRetryAttempts == 0 { - c.MaxRetryAttempts = source.MaxRetryAttempts + if c.Connection.MaxRetryAttempts == 0 { + c.Connection.MaxRetryAttempts = source.Connection.MaxRetryAttempts } - if c.Timeout == 0 { - c.Timeout = source.Timeout + if c.Connection.QueryTimeout == 0 { + c.Connection.QueryTimeout = source.Connection.QueryTimeout } - if c.ReconnectInterval == 0 { - c.ReconnectInterval = source.ReconnectInterval + if c.Connection.ReconnectInterval == 0 { + c.Connection.ReconnectInterval = source.Connection.ReconnectInterval } - if c.Port == 0 { - c.Port = source.Port + if c.Connection.Port == 0 { + c.Connection.Port = source.Connection.Port } - if c.Keyspace == "" { - c.Keyspace = source.Keyspace + if c.Connection.ProtoVersion == 0 { + c.Connection.ProtoVersion = source.Connection.ProtoVersion } - if c.ProtoVersion == 0 { - c.ProtoVersion = source.ProtoVersion + if c.Connection.SocketKeepAlive == 0 { + c.Connection.SocketKeepAlive = source.Connection.SocketKeepAlive } - if c.SocketKeepAlive == 0 { - c.SocketKeepAlive = source.SocketKeepAlive + + if c.Schema.Keyspace == "" { + c.Schema.Keyspace = source.Schema.Keyspace } } @@ -110,57 +123,57 @@ func (c *Configuration) NewSession() (cassandra.Session, error) { // NewCluster creates a new gocql cluster from the configuration func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) { - cluster := gocql.NewCluster(c.Servers...) - cluster.Keyspace = c.Keyspace - cluster.NumConns = c.ConnectionsPerHost - cluster.Timeout = c.Timeout - cluster.ConnectTimeout = c.ConnectTimeout - cluster.ReconnectInterval = c.ReconnectInterval - cluster.SocketKeepalive = c.SocketKeepAlive - if c.ProtoVersion > 0 { - cluster.ProtoVersion = c.ProtoVersion + cluster := gocql.NewCluster(c.Connection.Servers...) + cluster.Keyspace = c.Schema.Keyspace + cluster.NumConns = c.Connection.ConnectionsPerHost + cluster.Timeout = c.Connection.QueryTimeout + cluster.ConnectTimeout = c.Connection.ConnectTimeout + cluster.ReconnectInterval = c.Connection.ReconnectInterval + cluster.SocketKeepalive = c.Connection.SocketKeepAlive + if c.Connection.ProtoVersion > 0 { + cluster.ProtoVersion = c.Connection.ProtoVersion } - if c.MaxRetryAttempts > 1 { - cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.MaxRetryAttempts - 1} + if c.Connection.MaxRetryAttempts > 1 { + cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.Connection.MaxRetryAttempts - 1} } - if c.Port != 0 { - cluster.Port = c.Port + if c.Connection.Port != 0 { + cluster.Port = c.Connection.Port } - if !c.DisableCompression { + if !c.Schema.DisableCompression { cluster.Compressor = gocql.SnappyCompressor{} } - if c.Consistency == "" { + if c.Connection.Consistency == "" { cluster.Consistency = gocql.LocalOne } else { - cluster.Consistency = gocql.ParseConsistency(c.Consistency) + cluster.Consistency = gocql.ParseConsistency(c.Connection.Consistency) } fallbackHostSelectionPolicy := gocql.RoundRobinHostPolicy() - if c.LocalDC != "" { - fallbackHostSelectionPolicy = gocql.DCAwareRoundRobinPolicy(c.LocalDC) + if c.Connection.LocalDC != "" { + fallbackHostSelectionPolicy = gocql.DCAwareRoundRobinPolicy(c.Connection.LocalDC) } cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallbackHostSelectionPolicy, gocql.ShuffleReplicas()) - if c.Authenticator.Basic.Username != "" && c.Authenticator.Basic.Password != "" { + if c.Connection.Authenticator.Basic.Username != "" && c.Connection.Authenticator.Basic.Password != "" { cluster.Authenticator = gocql.PasswordAuthenticator{ - Username: c.Authenticator.Basic.Username, - Password: c.Authenticator.Basic.Password, - AllowedAuthenticators: c.Authenticator.Basic.AllowedAuthenticators, + Username: c.Connection.Authenticator.Basic.Username, + Password: c.Connection.Authenticator.Basic.Password, + AllowedAuthenticators: c.Connection.Authenticator.Basic.AllowedAuthenticators, } } - tlsCfg, err := c.TLS.LoadTLSConfig(context.Background()) + tlsCfg, err := c.Connection.TLS.LoadTLSConfig(context.Background()) if err != nil { return nil, err } - if !c.TLS.Insecure { + if !c.Connection.TLS.Insecure { cluster.SslOpts = &gocql.SslOptions{ Config: tlsCfg, } } // If tunneling connection to C*, disable cluster autodiscovery features. - if c.DisableAutoDiscovery { + if c.Connection.DisableAutoDiscovery { cluster.DisableInitialHostLookup = true cluster.IgnorePeerAddr = true } diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 4f92fe7181a..f2ffd551485 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -195,7 +195,9 @@ func TestNewFactoryWithConfig(t *testing.T) { opts := &Options{ Primary: NamespaceConfig{ Configuration: cassandraCfg.Configuration{ - Servers: []string{"localhost:9200"}, + Connection: cassandraCfg.Connection{ + Servers: []string{"localhost:9200"}, + }, }, }, } @@ -215,7 +217,9 @@ func TestNewFactoryWithConfig(t *testing.T) { opts := &Options{ Primary: NamespaceConfig{ Configuration: cassandraCfg.Configuration{ - Servers: []string{"localhost:9200"}, + Connection: cassandraCfg.Connection{ + Servers: []string{"localhost:9200"}, + }, }, }, } diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index ff7792bf459..77fbbfff29f 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -136,43 +136,43 @@ func addFlags(flagSet *flag.FlagSet, nsConfig NamespaceConfig) { } flagSet.Int( nsConfig.namespace+suffixConnPerHost, - nsConfig.ConnectionsPerHost, + nsConfig.Connection.ConnectionsPerHost, "The number of Cassandra connections from a single backend instance") flagSet.Int( nsConfig.namespace+suffixMaxRetryAttempts, - nsConfig.MaxRetryAttempts, + nsConfig.Connection.MaxRetryAttempts, "The number of attempts when reading from Cassandra") flagSet.Duration( nsConfig.namespace+suffixTimeout, - nsConfig.Timeout, + nsConfig.Connection.QueryTimeout, "Timeout used for queries. A Timeout of zero means no timeout") flagSet.Duration( nsConfig.namespace+suffixConnectTimeout, - nsConfig.ConnectTimeout, + nsConfig.Connection.ConnectTimeout, "Timeout used for connections to Cassandra Servers") flagSet.Duration( nsConfig.namespace+suffixReconnectInterval, - nsConfig.ReconnectInterval, + nsConfig.Connection.ReconnectInterval, "Reconnect interval to retry connecting to downed hosts") flagSet.String( nsConfig.namespace+suffixServers, - strings.Join(nsConfig.Servers, ","), + strings.Join(nsConfig.Connection.Servers, ","), "The comma-separated list of Cassandra servers") flagSet.Int( nsConfig.namespace+suffixPort, - nsConfig.Port, + nsConfig.Connection.Port, "The port for cassandra") flagSet.String( nsConfig.namespace+suffixKeyspace, - nsConfig.Keyspace, + nsConfig.Schema.Keyspace, "The Cassandra keyspace for Jaeger data") flagSet.String( nsConfig.namespace+suffixDC, - nsConfig.LocalDC, + nsConfig.Connection.LocalDC, "The name of the Cassandra local data center for DC Aware host selection") flagSet.String( nsConfig.namespace+suffixConsistency, - nsConfig.Consistency, + nsConfig.Connection.Consistency, "The Cassandra consistency level, e.g. ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE (default LOCAL_ONE)") flagSet.Bool( nsConfig.namespace+suffixDisableCompression, @@ -180,19 +180,19 @@ func addFlags(flagSet *flag.FlagSet, nsConfig NamespaceConfig) { "Disables the use of the default Snappy Compression while connecting to the Cassandra Cluster if set to true. This is useful for connecting to Cassandra Clusters(like Azure Cosmos Db with Cassandra API) that do not support SnappyCompression") flagSet.Int( nsConfig.namespace+suffixProtoVer, - nsConfig.ProtoVersion, + nsConfig.Connection.ProtoVersion, "The Cassandra protocol version") flagSet.Duration( nsConfig.namespace+suffixSocketKeepAlive, - nsConfig.SocketKeepAlive, + nsConfig.Connection.SocketKeepAlive, "Cassandra's keepalive period to use, enabled if > 0") flagSet.String( nsConfig.namespace+suffixUsername, - nsConfig.Authenticator.Basic.Username, + nsConfig.Connection.Authenticator.Basic.Username, "Username for password authentication for Cassandra") flagSet.String( nsConfig.namespace+suffixPassword, - nsConfig.Authenticator.Basic.Password, + nsConfig.Connection.Authenticator.Basic.Password, "Password for password authentication for Cassandra") flagSet.String( nsConfig.namespace+suffixAuth, @@ -228,31 +228,31 @@ func (cfg *NamespaceConfig) initFromViper(v *viper.Viper) { if cfg.namespace != primaryStorageConfig { cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled) } - cfg.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost) - cfg.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts) - cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) - cfg.ConnectTimeout = v.GetDuration(cfg.namespace + suffixConnectTimeout) - cfg.ReconnectInterval = v.GetDuration(cfg.namespace + suffixReconnectInterval) + cfg.Connection.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost) + cfg.Connection.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts) + cfg.Connection.QueryTimeout = v.GetDuration(cfg.namespace + suffixTimeout) + cfg.Connection.ConnectTimeout = v.GetDuration(cfg.namespace + suffixConnectTimeout) + cfg.Connection.ReconnectInterval = v.GetDuration(cfg.namespace + suffixReconnectInterval) servers := stripWhiteSpace(v.GetString(cfg.namespace + suffixServers)) - cfg.Servers = strings.Split(servers, ",") - cfg.Port = v.GetInt(cfg.namespace + suffixPort) - cfg.Keyspace = v.GetString(cfg.namespace + suffixKeyspace) - cfg.LocalDC = v.GetString(cfg.namespace + suffixDC) - cfg.Consistency = v.GetString(cfg.namespace + suffixConsistency) - cfg.ProtoVersion = v.GetInt(cfg.namespace + suffixProtoVer) - cfg.SocketKeepAlive = v.GetDuration(cfg.namespace + suffixSocketKeepAlive) - cfg.Authenticator.Basic.Username = v.GetString(cfg.namespace + suffixUsername) - cfg.Authenticator.Basic.Password = v.GetString(cfg.namespace + suffixPassword) + cfg.Connection.Servers = strings.Split(servers, ",") + cfg.Connection.Port = v.GetInt(cfg.namespace + suffixPort) + cfg.Schema.Keyspace = v.GetString(cfg.namespace + suffixKeyspace) + cfg.Connection.LocalDC = v.GetString(cfg.namespace + suffixDC) + cfg.Connection.Consistency = v.GetString(cfg.namespace + suffixConsistency) + cfg.Connection.ProtoVersion = v.GetInt(cfg.namespace + suffixProtoVer) + cfg.Connection.SocketKeepAlive = v.GetDuration(cfg.namespace + suffixSocketKeepAlive) + cfg.Connection.Authenticator.Basic.Username = v.GetString(cfg.namespace + suffixUsername) + cfg.Connection.Authenticator.Basic.Password = v.GetString(cfg.namespace + suffixPassword) authentication := stripWhiteSpace(v.GetString(cfg.namespace + suffixAuth)) - cfg.Authenticator.Basic.AllowedAuthenticators = strings.Split(authentication, ",") - cfg.DisableCompression = v.GetBool(cfg.namespace + suffixDisableCompression) + cfg.Connection.Authenticator.Basic.AllowedAuthenticators = strings.Split(authentication, ",") + cfg.Schema.DisableCompression = v.GetBool(cfg.namespace + suffixDisableCompression) var err error tlsCfg, err := tlsFlagsConfig.InitFromViper(v) if err != nil { // TODO refactor to be able to return error log.Fatal(err) } - cfg.TLS = tlsCfg.ToOtelClientConfig() + cfg.Connection.TLS = tlsCfg.ToOtelClientConfig() } // GetPrimary returns primary configuration. @@ -271,8 +271,8 @@ func (opt *Options) Get(namespace string) *config.Configuration { return nil } nsCfg.Configuration.ApplyDefaults(&opt.Primary.Configuration) - if len(nsCfg.Servers) == 0 { - nsCfg.Servers = opt.Primary.Servers + if len(nsCfg.Connection.Servers) == 0 { + nsCfg.Connection.Servers = opt.Primary.Connection.Servers } return &nsCfg.Configuration } diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index e405f7abc07..338a83425ba 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -17,9 +17,9 @@ import ( func TestOptions(t *testing.T) { opts := NewOptions("foo") primary := opts.GetPrimary() - assert.NotEmpty(t, primary.Keyspace) - assert.NotEmpty(t, primary.Servers) - assert.Equal(t, 2, primary.ConnectionsPerHost) + assert.NotEmpty(t, primary.Schema.Keyspace) + assert.NotEmpty(t, primary.Connection.Servers) + assert.Equal(t, 2, primary.Connection.ConnectionsPerHost) aux := opts.Get("archive") assert.Nil(t, aux) @@ -28,10 +28,10 @@ func TestOptions(t *testing.T) { opts.others["archive"].Enabled = true aux = opts.Get("archive") require.NotNil(t, aux) - assert.Equal(t, primary.Keyspace, aux.Keyspace) - assert.Equal(t, primary.Servers, aux.Servers) - assert.Equal(t, primary.ConnectionsPerHost, aux.ConnectionsPerHost) - assert.Equal(t, primary.ReconnectInterval, aux.ReconnectInterval) + assert.Equal(t, primary.Schema.Keyspace, aux.Schema.Keyspace) + assert.Equal(t, primary.Connection.Servers, aux.Connection.Servers) + assert.Equal(t, primary.Connection.ConnectionsPerHost, aux.Connection.ConnectionsPerHost) + assert.Equal(t, primary.Connection.ReconnectInterval, aux.Connection.ReconnectInterval) } func TestOptionsWithFlags(t *testing.T) { @@ -67,11 +67,11 @@ func TestOptionsWithFlags(t *testing.T) { opts.InitFromViper(v) primary := opts.GetPrimary() - assert.Equal(t, "jaeger", primary.Keyspace) - assert.Equal(t, "mojave", primary.LocalDC) - assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) - assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.datastax.bdp.cassandra.auth.DseAuthenticator"}, primary.Authenticator.Basic.AllowedAuthenticators) - assert.Equal(t, "ONE", primary.Consistency) + assert.Equal(t, "jaeger", primary.Schema.Keyspace) + assert.Equal(t, "mojave", primary.Connection.LocalDC) + assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Connection.Servers) + assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.datastax.bdp.cassandra.auth.DseAuthenticator"}, primary.Connection.Authenticator.Basic.AllowedAuthenticators) + assert.Equal(t, "ONE", primary.Connection.Consistency) assert.Equal(t, []string{"blerg", "blarg", "blorg"}, opts.TagIndexBlacklist()) assert.Equal(t, []string{"flerg", "flarg", "florg"}, opts.TagIndexWhitelist()) assert.True(t, opts.Index.Tags) @@ -80,17 +80,17 @@ func TestOptionsWithFlags(t *testing.T) { aux := opts.Get("cas-aux") require.NotNil(t, aux) - assert.Equal(t, "jaeger-archive", aux.Keyspace) - assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) - assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator"}, aux.Authenticator.Basic.AllowedAuthenticators) - assert.Equal(t, 42, aux.ConnectionsPerHost) - assert.Equal(t, 42, aux.MaxRetryAttempts) - assert.Equal(t, 42*time.Second, aux.Timeout) - assert.Equal(t, 42*time.Second, aux.ReconnectInterval) - assert.Equal(t, 4242, aux.Port) - assert.Equal(t, "", aux.Consistency, "aux storage does not inherit consistency from primary") - assert.Equal(t, 3, aux.ProtoVersion) - assert.Equal(t, 42*time.Second, aux.SocketKeepAlive) + assert.Equal(t, "jaeger-archive", aux.Schema.Keyspace) + assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Connection.Servers) + assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator"}, aux.Connection.Authenticator.Basic.AllowedAuthenticators) + assert.Equal(t, 42, aux.Connection.ConnectionsPerHost) + assert.Equal(t, 42, aux.Connection.MaxRetryAttempts) + assert.Equal(t, 42*time.Second, aux.Connection.QueryTimeout) + assert.Equal(t, 42*time.Second, aux.Connection.ReconnectInterval) + assert.Equal(t, 4242, aux.Connection.Port) + assert.Equal(t, "", aux.Connection.Consistency, "aux storage does not inherit consistency from primary") + assert.Equal(t, 3, aux.Connection.ProtoVersion) + assert.Equal(t, 42*time.Second, aux.Connection.SocketKeepAlive) } func TestDefaultTlsHostVerify(t *testing.T) { @@ -102,7 +102,7 @@ func TestDefaultTlsHostVerify(t *testing.T) { opts.InitFromViper(v) primary := opts.GetPrimary() - assert.False(t, primary.TLS.InsecureSkipVerify) + assert.False(t, primary.Connection.TLS.InsecureSkipVerify) } func TestEmptyBlackWhiteLists(t *testing.T) { diff --git a/plugin/storage/cassandra/savetracetest/main.go b/plugin/storage/cassandra/savetracetest/main.go index c510202ae01..10b02cd5272 100644 --- a/plugin/storage/cassandra/savetracetest/main.go +++ b/plugin/storage/cassandra/savetracetest/main.go @@ -24,11 +24,15 @@ var logger, _ = zap.NewDevelopment() func main() { noScope := metrics.NullFactory cConfig := &cascfg.Configuration{ - Servers: []string{"127.0.0.1"}, - ConnectionsPerHost: 10, - Timeout: time.Millisecond * 750, - ProtoVersion: 4, - Keyspace: "jaeger_v1_test", + Connection: cascfg.Connection{ + Servers: []string{"127.0.0.1"}, + ConnectionsPerHost: 10, + QueryTimeout: time.Millisecond * 750, + ProtoVersion: 4, + }, + Schema: cascfg.Schema{ + Keyspace: "jaeger_v1_test", + }, } cqlSession, err := cConfig.NewSession() if err != nil { From 7c552280305c45847c08d5d79ef20a517854c2c3 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 9 Sep 2024 19:34:59 -0400 Subject: [PATCH 04/18] Fix Cassandra Test Configuration Signed-off-by: Mahad Zaryab --- cmd/jaeger/config-cassandra.yaml | 24 ++++++++++++++---------- pkg/cassandra/config/config.go | 2 +- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index c5897f7f039..417b633f7e0 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -20,18 +20,22 @@ extensions: backends: some_storage: cassandra: - keyspace: "jaeger_v1_dc1" - username: "cassandra" - password: "cassandra" - tls: - insecure: true + schema: + keyspace: "jaeger_v1_dc1" + connection: + username: "cassandra" + password: "cassandra" + tls: + insecure: true another_storage: cassandra: - keyspace: "jaeger_v1_dc1" - username: "cassandra" - password: "cassandra" - tls: - insecure: true + schema: + keyspace: "jaeger_v1_dc1" + connection: + username: "cassandra" + password: "cassandra" + tls: + insecure: true receivers: otlp: protocols: diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index a47f50b3f3e..3cc0d9e2bb5 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -19,8 +19,8 @@ import ( // Configuration describes the configuration properties needed to connect to a Cassandra cluster. type Configuration struct { - Connection Connection `mapstructure:"connection"` Schema Schema `mapstructure:"schema"` + Connection Connection `mapstructure:"connection"` } type Connection struct { From 66b68e898451eaa01a4fc68a970f042135ccee2a Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 9 Sep 2024 19:45:33 -0400 Subject: [PATCH 05/18] Add Unit Tests For Validate Method Signed-off-by: Mahad Zaryab --- pkg/cassandra/config/config_test.go | 46 +++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 pkg/cassandra/config/config_test.go diff --git a/pkg/cassandra/config/config_test.go b/pkg/cassandra/config/config_test.go new file mode 100644 index 00000000000..d754d3299e6 --- /dev/null +++ b/pkg/cassandra/config/config_test.go @@ -0,0 +1,46 @@ +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestValidate_ReturnsErrorWhenInvalid(t *testing.T) { + tests := []struct { + name string + cfg *Configuration + }{ + { + name: "missing required fields", + cfg: &Configuration{}, + }, + { + name: "require fields in invalid format", + cfg: &Configuration{ + Connection: Connection{ + Servers: []string{"not a url"}, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := test.cfg.Validate() + require.NoError(t, err) + }) + } +} + +func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) { + + cfg := Configuration{ + Connection: Connection{ + Servers: []string{"localhost:9200"}, + }, + } + + err := cfg.Validate() + require.NoError(t, err) +} From a0184c1a30807f4a1844f7640d4866f63835ffdb Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 9 Sep 2024 19:47:42 -0400 Subject: [PATCH 06/18] Fix Cassandra Configuration Signed-off-by: Mahad Zaryab --- cmd/jaeger/config-cassandra.yaml | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 417b633f7e0..59d37002ece 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -22,9 +22,10 @@ extensions: cassandra: schema: keyspace: "jaeger_v1_dc1" - connection: - username: "cassandra" - password: "cassandra" + connection: + auth: + username: "cassandra" + password: "cassandra" tls: insecure: true another_storage: @@ -32,8 +33,9 @@ extensions: schema: keyspace: "jaeger_v1_dc1" connection: - username: "cassandra" - password: "cassandra" + auth: + username: "cassandra" + password: "cassandra" tls: insecure: true receivers: From 10d848494cf98a34790b8556580971bbbc83cd88 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 9 Sep 2024 19:50:46 -0400 Subject: [PATCH 07/18] Run Formatter Signed-off-by: Mahad Zaryab --- pkg/cassandra/config/config_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/cassandra/config/config_test.go b/pkg/cassandra/config/config_test.go index d754d3299e6..d18a6c76f96 100644 --- a/pkg/cassandra/config/config_test.go +++ b/pkg/cassandra/config/config_test.go @@ -1,3 +1,6 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + package config import ( @@ -34,7 +37,6 @@ func TestValidate_ReturnsErrorWhenInvalid(t *testing.T) { } func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) { - cfg := Configuration{ Connection: Connection{ Servers: []string{"localhost:9200"}, From 9f939bb78201f638f58d5460d60be75be4e761c1 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 9 Sep 2024 20:19:10 -0400 Subject: [PATCH 08/18] Fix Test Signed-off-by: Mahad Zaryab --- pkg/cassandra/config/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cassandra/config/config_test.go b/pkg/cassandra/config/config_test.go index d18a6c76f96..d20fd534e7a 100644 --- a/pkg/cassandra/config/config_test.go +++ b/pkg/cassandra/config/config_test.go @@ -31,7 +31,7 @@ func TestValidate_ReturnsErrorWhenInvalid(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { err := test.cfg.Validate() - require.NoError(t, err) + require.Error(t, err) }) } } From 110753a705275bf60d3d90d53da74cf0a42b6b1e Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 9 Sep 2024 21:21:10 -0400 Subject: [PATCH 09/18] Remove Squash For Auth Signed-off-by: Mahad Zaryab --- cmd/jaeger/config-cassandra.yaml | 10 ++++++---- pkg/cassandra/config/config.go | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 59d37002ece..b53c59dba9e 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -24,8 +24,9 @@ extensions: keyspace: "jaeger_v1_dc1" connection: auth: - username: "cassandra" - password: "cassandra" + basic: + username: "cassandra" + password: "cassandra" tls: insecure: true another_storage: @@ -34,8 +35,9 @@ extensions: keyspace: "jaeger_v1_dc1" connection: auth: - username: "cassandra" - password: "cassandra" + basic: + username: "cassandra" + password: "cassandra" tls: insecure: true receivers: diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 3cc0d9e2bb5..ac92a47446d 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -47,7 +47,7 @@ type Schema struct { // Authenticator holds the authentication properties needed to connect to a Cassandra cluster type Authenticator struct { - Basic BasicAuthenticator `yaml:"basic" mapstructure:",squash"` + Basic BasicAuthenticator `mapstructure:"basic"` // TODO: add more auth types } From 71d5da37f8fd02e9f6811573a1c95cfd77e9d500 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 9 Sep 2024 21:30:09 -0400 Subject: [PATCH 10/18] Create Separate Grouping For Query Signed-off-by: Mahad Zaryab --- pkg/cassandra/config/config.go | 35 +++++++++++-------- plugin/storage/cassandra/options.go | 12 +++---- plugin/storage/cassandra/options_test.go | 4 +-- .../storage/cassandra/savetracetest/main.go | 8 +++-- 4 files changed, 33 insertions(+), 26 deletions(-) diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index ac92a47446d..7ca52cd49c8 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -21,6 +21,7 @@ import ( type Configuration struct { Schema Schema `mapstructure:"schema"` Connection Connection `mapstructure:"connection"` + Query Query `mapstructure:"query"` } type Connection struct { @@ -31,10 +32,8 @@ type Connection struct { ConnectionsPerHost int `mapstructure:"connections_per_host"` ReconnectInterval time.Duration `mapstructure:"reconnect_interval"` SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"` - MaxRetryAttempts int `mapstructure:"max_retry_attempts"` TLS configtls.ClientConfig `mapstructure:"tls"` - QueryTimeout time.Duration `mapstructure:"query_timeout"` - ConnectTimeout time.Duration `mapstructure:"connection_timeout"` + Timeout time.Duration `mapstructure:"timeout"` Authenticator Authenticator `mapstructure:"auth"` ProtoVersion int `mapstructure:"proto_version"` Consistency string `mapstructure:"consistency"` @@ -45,6 +44,11 @@ type Schema struct { DisableCompression bool `mapstructure:"disable_compression"` } +type Query struct { + Timeout time.Duration `mapstructure:"timeout"` + MaxRetryAttempts int `mapstructure:"max_retry_attempts"` +} + // Authenticator holds the authentication properties needed to connect to a Cassandra cluster type Authenticator struct { Basic BasicAuthenticator `mapstructure:"basic"` @@ -60,16 +64,18 @@ type BasicAuthenticator struct { func DefaultConfiguration() Configuration { return Configuration{ + Schema: Schema{ + Keyspace: "jaeger_v1_test", + }, Connection: Connection{ Servers: []string{"127.0.0.1"}, Port: 9042, - MaxRetryAttempts: 3, ProtoVersion: 4, ConnectionsPerHost: 2, ReconnectInterval: 60 * time.Second, }, - Schema: Schema{ - Keyspace: "jaeger_v1_test", + Query: Query{ + MaxRetryAttempts: 3, }, } } @@ -79,11 +85,11 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.Connection.ConnectionsPerHost == 0 { c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost } - if c.Connection.MaxRetryAttempts == 0 { - c.Connection.MaxRetryAttempts = source.Connection.MaxRetryAttempts + if c.Query.MaxRetryAttempts == 0 { + c.Query.MaxRetryAttempts = source.Query.MaxRetryAttempts } - if c.Connection.QueryTimeout == 0 { - c.Connection.QueryTimeout = source.Connection.QueryTimeout + if c.Query.Timeout == 0 { + c.Query.Timeout = source.Query.Timeout } if c.Connection.ReconnectInterval == 0 { c.Connection.ReconnectInterval = source.Connection.ReconnectInterval @@ -97,7 +103,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.Connection.SocketKeepAlive == 0 { c.Connection.SocketKeepAlive = source.Connection.SocketKeepAlive } - if c.Schema.Keyspace == "" { c.Schema.Keyspace = source.Schema.Keyspace } @@ -126,15 +131,15 @@ func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) { cluster := gocql.NewCluster(c.Connection.Servers...) cluster.Keyspace = c.Schema.Keyspace cluster.NumConns = c.Connection.ConnectionsPerHost - cluster.Timeout = c.Connection.QueryTimeout - cluster.ConnectTimeout = c.Connection.ConnectTimeout + cluster.Timeout = c.Query.Timeout + cluster.ConnectTimeout = c.Connection.Timeout cluster.ReconnectInterval = c.Connection.ReconnectInterval cluster.SocketKeepalive = c.Connection.SocketKeepAlive if c.Connection.ProtoVersion > 0 { cluster.ProtoVersion = c.Connection.ProtoVersion } - if c.Connection.MaxRetryAttempts > 1 { - cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.Connection.MaxRetryAttempts - 1} + if c.Query.MaxRetryAttempts > 1 { + cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.Query.MaxRetryAttempts - 1} } if c.Connection.Port != 0 { cluster.Port = c.Connection.Port diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index 77fbbfff29f..b664e99cbed 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -140,15 +140,15 @@ func addFlags(flagSet *flag.FlagSet, nsConfig NamespaceConfig) { "The number of Cassandra connections from a single backend instance") flagSet.Int( nsConfig.namespace+suffixMaxRetryAttempts, - nsConfig.Connection.MaxRetryAttempts, + nsConfig.Query.MaxRetryAttempts, "The number of attempts when reading from Cassandra") flagSet.Duration( nsConfig.namespace+suffixTimeout, - nsConfig.Connection.QueryTimeout, + nsConfig.Query.Timeout, "Timeout used for queries. A Timeout of zero means no timeout") flagSet.Duration( nsConfig.namespace+suffixConnectTimeout, - nsConfig.Connection.ConnectTimeout, + nsConfig.Connection.Timeout, "Timeout used for connections to Cassandra Servers") flagSet.Duration( nsConfig.namespace+suffixReconnectInterval, @@ -229,9 +229,9 @@ func (cfg *NamespaceConfig) initFromViper(v *viper.Viper) { cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled) } cfg.Connection.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost) - cfg.Connection.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts) - cfg.Connection.QueryTimeout = v.GetDuration(cfg.namespace + suffixTimeout) - cfg.Connection.ConnectTimeout = v.GetDuration(cfg.namespace + suffixConnectTimeout) + cfg.Query.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts) + cfg.Query.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) + cfg.Connection.Timeout = v.GetDuration(cfg.namespace + suffixConnectTimeout) cfg.Connection.ReconnectInterval = v.GetDuration(cfg.namespace + suffixReconnectInterval) servers := stripWhiteSpace(v.GetString(cfg.namespace + suffixServers)) cfg.Connection.Servers = strings.Split(servers, ",") diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index 338a83425ba..5d0b4b50c5e 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -84,8 +84,8 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Connection.Servers) assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator"}, aux.Connection.Authenticator.Basic.AllowedAuthenticators) assert.Equal(t, 42, aux.Connection.ConnectionsPerHost) - assert.Equal(t, 42, aux.Connection.MaxRetryAttempts) - assert.Equal(t, 42*time.Second, aux.Connection.QueryTimeout) + assert.Equal(t, 42, aux.Query.MaxRetryAttempts) + assert.Equal(t, 42*time.Second, aux.Query.Timeout) assert.Equal(t, 42*time.Second, aux.Connection.ReconnectInterval) assert.Equal(t, 4242, aux.Connection.Port) assert.Equal(t, "", aux.Connection.Consistency, "aux storage does not inherit consistency from primary") diff --git a/plugin/storage/cassandra/savetracetest/main.go b/plugin/storage/cassandra/savetracetest/main.go index 10b02cd5272..00bfe9b4ed5 100644 --- a/plugin/storage/cassandra/savetracetest/main.go +++ b/plugin/storage/cassandra/savetracetest/main.go @@ -24,14 +24,16 @@ var logger, _ = zap.NewDevelopment() func main() { noScope := metrics.NullFactory cConfig := &cascfg.Configuration{ + Schema: cascfg.Schema{ + Keyspace: "jaeger_v1_test", + }, Connection: cascfg.Connection{ Servers: []string{"127.0.0.1"}, ConnectionsPerHost: 10, - QueryTimeout: time.Millisecond * 750, ProtoVersion: 4, }, - Schema: cascfg.Schema{ - Keyspace: "jaeger_v1_test", + Query: cascfg.Query{ + Timeout: time.Millisecond * 750, }, } cqlSession, err := cConfig.NewSession() From 8d21a2552ae99935095c5b4a13f9c0152560d071 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Wed, 11 Sep 2024 20:01:07 -0400 Subject: [PATCH 11/18] Move Consistency To Query Signed-off-by: Mahad Zaryab --- pkg/cassandra/config/config.go | 6 +++--- plugin/storage/cassandra/options.go | 4 ++-- plugin/storage/cassandra/options_test.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 7ca52cd49c8..1f7886417e5 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -36,7 +36,6 @@ type Connection struct { Timeout time.Duration `mapstructure:"timeout"` Authenticator Authenticator `mapstructure:"auth"` ProtoVersion int `mapstructure:"proto_version"` - Consistency string `mapstructure:"consistency"` } type Schema struct { @@ -47,6 +46,7 @@ type Schema struct { type Query struct { Timeout time.Duration `mapstructure:"timeout"` MaxRetryAttempts int `mapstructure:"max_retry_attempts"` + Consistency string `mapstructure:"consistency"` } // Authenticator holds the authentication properties needed to connect to a Cassandra cluster @@ -149,10 +149,10 @@ func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) { cluster.Compressor = gocql.SnappyCompressor{} } - if c.Connection.Consistency == "" { + if c.Query.Consistency == "" { cluster.Consistency = gocql.LocalOne } else { - cluster.Consistency = gocql.ParseConsistency(c.Connection.Consistency) + cluster.Consistency = gocql.ParseConsistency(c.Query.Consistency) } fallbackHostSelectionPolicy := gocql.RoundRobinHostPolicy() diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index b664e99cbed..e266436ba29 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -172,7 +172,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig NamespaceConfig) { "The name of the Cassandra local data center for DC Aware host selection") flagSet.String( nsConfig.namespace+suffixConsistency, - nsConfig.Connection.Consistency, + nsConfig.Query.Consistency, "The Cassandra consistency level, e.g. ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE (default LOCAL_ONE)") flagSet.Bool( nsConfig.namespace+suffixDisableCompression, @@ -238,7 +238,7 @@ func (cfg *NamespaceConfig) initFromViper(v *viper.Viper) { cfg.Connection.Port = v.GetInt(cfg.namespace + suffixPort) cfg.Schema.Keyspace = v.GetString(cfg.namespace + suffixKeyspace) cfg.Connection.LocalDC = v.GetString(cfg.namespace + suffixDC) - cfg.Connection.Consistency = v.GetString(cfg.namespace + suffixConsistency) + cfg.Query.Consistency = v.GetString(cfg.namespace + suffixConsistency) cfg.Connection.ProtoVersion = v.GetInt(cfg.namespace + suffixProtoVer) cfg.Connection.SocketKeepAlive = v.GetDuration(cfg.namespace + suffixSocketKeepAlive) cfg.Connection.Authenticator.Basic.Username = v.GetString(cfg.namespace + suffixUsername) diff --git a/plugin/storage/cassandra/options_test.go b/plugin/storage/cassandra/options_test.go index 5d0b4b50c5e..8ccd813c838 100644 --- a/plugin/storage/cassandra/options_test.go +++ b/plugin/storage/cassandra/options_test.go @@ -71,7 +71,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "mojave", primary.Connection.LocalDC) assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Connection.Servers) assert.Equal(t, []string{"org.apache.cassandra.auth.PasswordAuthenticator", "com.datastax.bdp.cassandra.auth.DseAuthenticator"}, primary.Connection.Authenticator.Basic.AllowedAuthenticators) - assert.Equal(t, "ONE", primary.Connection.Consistency) + assert.Equal(t, "ONE", primary.Query.Consistency) assert.Equal(t, []string{"blerg", "blarg", "blorg"}, opts.TagIndexBlacklist()) assert.Equal(t, []string{"flerg", "flarg", "florg"}, opts.TagIndexWhitelist()) assert.True(t, opts.Index.Tags) @@ -88,7 +88,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, 42*time.Second, aux.Query.Timeout) assert.Equal(t, 42*time.Second, aux.Connection.ReconnectInterval) assert.Equal(t, 4242, aux.Connection.Port) - assert.Equal(t, "", aux.Connection.Consistency, "aux storage does not inherit consistency from primary") + assert.Equal(t, "", aux.Query.Consistency, "aux storage does not inherit consistency from primary") assert.Equal(t, 3, aux.Connection.ProtoVersion) assert.Equal(t, 42*time.Second, aux.Connection.SocketKeepAlive) } From cef8e82d92858aafacde22ccb6cdd85b246fc9a6 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Wed, 11 Sep 2024 21:32:40 -0400 Subject: [PATCH 12/18] Add Documentation For Fields Signed-off-by: Mahad Zaryab --- pkg/cassandra/config/config.go | 57 +++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 1f7886417e5..90e5e026c97 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -25,37 +25,58 @@ type Configuration struct { } type Connection struct { - Servers []string `mapstructure:"servers" valid:"required,url" ` - LocalDC string `mapstructure:"local_dc"` - Port int `mapstructure:"port"` - DisableAutoDiscovery bool `mapstructure:"disable_auto_discovery"` - ConnectionsPerHost int `mapstructure:"connections_per_host"` - ReconnectInterval time.Duration `mapstructure:"reconnect_interval"` - SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"` - TLS configtls.ClientConfig `mapstructure:"tls"` - Timeout time.Duration `mapstructure:"timeout"` - Authenticator Authenticator `mapstructure:"auth"` - ProtoVersion int `mapstructure:"proto_version"` + // Servers contains a list of hosts that are used to connect to the cluster. + Servers []string `mapstructure:"servers" valid:"required,url"` + // LocalDC contains the name of the local Data Center (DC) for DC-aware host selection + LocalDC string `mapstructure:"local_dc"` + // The port used when dialing to a cluster. + Port int `mapstructure:"port"` + // DisableAutoDiscovery, if set to true, will disable the cluster's auto-discovery features. + DisableAutoDiscovery bool `mapstructure:"disable_auto_discovery"` + // ConnectionsPerHost contains the maximum number of open connections for each host on the cluster. + ConnectionsPerHost int `mapstructure:"connections_per_host"` + // ReconnectInterval contains the regular interval after which the driver tries to connect to + // nodes that are down. + ReconnectInterval time.Duration `mapstructure:"reconnect_interval"` + // SocketKeepAlive contains the keep alive period for the default dialer to the cluster. + SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"` + // TLS contains the TLS configuration for the connection to the cluster. + TLS configtls.ClientConfig `mapstructure:"tls"` + // Timeout contains the maximum time spent to connect to a cluster. + Timeout time.Duration `mapstructure:"timeout"` + // Authenticator contains the details of the authentication mechanism that is used for + // connecting to a cluster. + Authenticator Authenticator `mapstructure:"auth"` + // ProtoVersion contains the version of the native protocol to use when connecting to a cluster. + ProtoVersion int `mapstructure:"proto_version"` } type Schema struct { - Keyspace string `mapstructure:"keyspace"` - DisableCompression bool `mapstructure:"disable_compression"` + // Keyspace contains the namespace where Jaeger data will be stored. + Keyspace string `mapstructure:"keyspace"` + // DisableCompression, if set to true, disables the use of the default Snappy Compression + // while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB, + // that do not support SnappyCompression. + DisableCompression bool `mapstructure:"disable_compression"` } type Query struct { - Timeout time.Duration `mapstructure:"timeout"` - MaxRetryAttempts int `mapstructure:"max_retry_attempts"` - Consistency string `mapstructure:"consistency"` + // Timeout contains the maximum time spent executing a query. + Timeout time.Duration `mapstructure:"timeout"` + // MaxRetryAttempts indicates the maximum number of times a query will be retried for execution. + MaxRetryAttempts int `mapstructure:"max_retry_attempts"` + // Consistency specifies the consistency level which needs to be satisified before responding + // to a query. + Consistency string `mapstructure:"consistency"` } -// Authenticator holds the authentication properties needed to connect to a Cassandra cluster +// Authenticator holds the authentication properties needed to connect to a Cassandra cluster. type Authenticator struct { Basic BasicAuthenticator `mapstructure:"basic"` // TODO: add more auth types } -// BasicAuthenticator holds the username and password for a password authenticator for a Cassandra cluster +// BasicAuthenticator holds the username and password for a password authenticator for a Cassandra cluster. type BasicAuthenticator struct { Username string `yaml:"username" mapstructure:"username"` Password string `yaml:"password" mapstructure:"password" json:"-"` From 898debd538608ba300a5557b610a9b6988aa61cf Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Wed, 11 Sep 2024 21:44:06 -0400 Subject: [PATCH 13/18] Group Configurations In Assignment Methods Signed-off-by: Mahad Zaryab --- pkg/cassandra/config/config.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 90e5e026c97..22b16e6c608 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -103,15 +103,12 @@ func DefaultConfiguration() Configuration { // ApplyDefaults copies settings from source unless its own value is non-zero. func (c *Configuration) ApplyDefaults(source *Configuration) { + if c.Schema.Keyspace == "" { + c.Schema.Keyspace = source.Schema.Keyspace + } if c.Connection.ConnectionsPerHost == 0 { c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost } - if c.Query.MaxRetryAttempts == 0 { - c.Query.MaxRetryAttempts = source.Query.MaxRetryAttempts - } - if c.Query.Timeout == 0 { - c.Query.Timeout = source.Query.Timeout - } if c.Connection.ReconnectInterval == 0 { c.Connection.ReconnectInterval = source.Connection.ReconnectInterval } @@ -124,8 +121,11 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.Connection.SocketKeepAlive == 0 { c.Connection.SocketKeepAlive = source.Connection.SocketKeepAlive } - if c.Schema.Keyspace == "" { - c.Schema.Keyspace = source.Schema.Keyspace + if c.Query.MaxRetryAttempts == 0 { + c.Query.MaxRetryAttempts = source.Query.MaxRetryAttempts + } + if c.Query.Timeout == 0 { + c.Query.Timeout = source.Query.Timeout } } @@ -152,10 +152,10 @@ func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) { cluster := gocql.NewCluster(c.Connection.Servers...) cluster.Keyspace = c.Schema.Keyspace cluster.NumConns = c.Connection.ConnectionsPerHost - cluster.Timeout = c.Query.Timeout cluster.ConnectTimeout = c.Connection.Timeout cluster.ReconnectInterval = c.Connection.ReconnectInterval cluster.SocketKeepalive = c.Connection.SocketKeepAlive + cluster.Timeout = c.Query.Timeout if c.Connection.ProtoVersion > 0 { cluster.ProtoVersion = c.Connection.ProtoVersion } From 0c3ddcbb45f5f19be2143c191dab139734290d34 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Wed, 11 Sep 2024 22:05:23 -0400 Subject: [PATCH 14/18] Remove Redundant YAML Tags Signed-off-by: Mahad Zaryab --- pkg/cassandra/config/config.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 22b16e6c608..668ebe06332 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -78,9 +78,9 @@ type Authenticator struct { // BasicAuthenticator holds the username and password for a password authenticator for a Cassandra cluster. type BasicAuthenticator struct { - Username string `yaml:"username" mapstructure:"username"` - Password string `yaml:"password" mapstructure:"password" json:"-"` - AllowedAuthenticators []string `yaml:"allowed_authenticators" mapstructure:"allowed_authenticators"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password" json:"-"` + AllowedAuthenticators []string `mapstructure:"allowed_authenticators"` } func DefaultConfiguration() Configuration { From 9c705d1f26a9f72ad8ccddcaf1732bad4329f13c Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Wed, 25 Sep 2024 20:05:31 -0400 Subject: [PATCH 15/18] Move TLS Loading Inside Insecure Check Signed-off-by: Mahad Zaryab --- pkg/cassandra/config/config.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 668ebe06332..6bab9c75da4 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -189,11 +189,11 @@ func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) { AllowedAuthenticators: c.Connection.Authenticator.Basic.AllowedAuthenticators, } } - tlsCfg, err := c.Connection.TLS.LoadTLSConfig(context.Background()) - if err != nil { - return nil, err - } if !c.Connection.TLS.Insecure { + tlsCfg, err := c.Connection.TLS.LoadTLSConfig(context.Background()) + if err != nil { + return nil, err + } cluster.SslOpts = &gocql.SslOptions{ Config: tlsCfg, } From ef27a3c74033ffdec532bfb3546c6330381a3e65 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 25 Sep 2024 21:17:30 -0400 Subject: [PATCH 16/18] add tests Signed-off-by: Yuri Shkuro --- pkg/cassandra/config/config_test.go | 48 +++++++++++++++++++ .../config/{empty_test.go => package_test.go} | 0 2 files changed, 48 insertions(+) rename pkg/cassandra/config/{empty_test.go => package_test.go} (100%) diff --git a/pkg/cassandra/config/config_test.go b/pkg/cassandra/config/config_test.go index d20fd534e7a..01677174dd1 100644 --- a/pkg/cassandra/config/config_test.go +++ b/pkg/cassandra/config/config_test.go @@ -6,6 +6,8 @@ package config import ( "testing" + "github.com/crossdock/crossdock-go/assert" + "github.com/gocql/gocql" "github.com/stretchr/testify/require" ) @@ -46,3 +48,49 @@ func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) { err := cfg.Validate() require.NoError(t, err) } + +func TestNewClusterWithDefaults(t *testing.T) { + cfg := DefaultConfiguration() + cl, err := cfg.NewCluster() + require.NoError(t, err) + assert.NotEmpty(t, cl.Keyspace) +} + +func TestNewClusterWithOverrides(t *testing.T) { + cfg := DefaultConfiguration() + cfg.Query.Consistency = "LOCAL_QUORUM" + cfg.Connection.LocalDC = "local_dc" + cfg.Connection.Authenticator.Basic.Username = "username" + cfg.Connection.Authenticator.Basic.Password = "password" + cfg.Connection.TLS.Insecure = false + cfg.Connection.DisableAutoDiscovery = true + cl, err := cfg.NewCluster() + require.NoError(t, err) + assert.NotEmpty(t, cl.Keyspace) + assert.Equal(t, cl.Consistency, gocql.LocalQuorum) + assert.NotNil(t, cl.PoolConfig.HostSelectionPolicy, "local_dc") + require.IsType(t, gocql.PasswordAuthenticator{}, cl.Authenticator) + auth := cl.Authenticator.(gocql.PasswordAuthenticator) + assert.Equal(t, auth.Username, "username") + assert.Equal(t, auth.Password, "password") + assert.NotNil(t, cl.SslOpts) + assert.True(t, cl.DisableInitialHostLookup) +} + +func TestApplyDefaults(t *testing.T) { + cfg1 := DefaultConfiguration() + cfg2 := Configuration{} + cfg2.ApplyDefaults(&cfg1) + assert.Equal(t, cfg2.Schema, cfg1.Schema) + assert.Equal(t, cfg2.Query, cfg1.Query) + assert.NotEqual(t, cfg2.Connection.Servers, cfg1.Connection.Servers, "servers not copied") + cfg1.Connection.Servers = nil + assert.Equal(t, cfg2.Connection, cfg1.Connection) +} + +func TestToString(t *testing.T) { + cfg := DefaultConfiguration() + cfg.Schema.Keyspace = "test" + s := cfg.String() + assert.Contains(t, s, "Keyspace:test") +} diff --git a/pkg/cassandra/config/empty_test.go b/pkg/cassandra/config/package_test.go similarity index 100% rename from pkg/cassandra/config/empty_test.go rename to pkg/cassandra/config/package_test.go From f2cc9d93af4d9cb3269d9797a6cf273c8c63a6d2 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 25 Sep 2024 21:22:55 -0400 Subject: [PATCH 17/18] fix-lint Signed-off-by: Yuri Shkuro --- pkg/cassandra/config/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cassandra/config/config_test.go b/pkg/cassandra/config/config_test.go index 01677174dd1..19f69a3a087 100644 --- a/pkg/cassandra/config/config_test.go +++ b/pkg/cassandra/config/config_test.go @@ -6,8 +6,8 @@ package config import ( "testing" - "github.com/crossdock/crossdock-go/assert" "github.com/gocql/gocql" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) From 2f1f0e91ec6f5e244584e178e91bb47567913e21 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Thu, 26 Sep 2024 00:00:12 -0400 Subject: [PATCH 18/18] Fix Linting Signed-off-by: Mahad Zaryab --- pkg/cassandra/config/config_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/cassandra/config/config_test.go b/pkg/cassandra/config/config_test.go index 19f69a3a087..8d532f65c60 100644 --- a/pkg/cassandra/config/config_test.go +++ b/pkg/cassandra/config/config_test.go @@ -67,12 +67,12 @@ func TestNewClusterWithOverrides(t *testing.T) { cl, err := cfg.NewCluster() require.NoError(t, err) assert.NotEmpty(t, cl.Keyspace) - assert.Equal(t, cl.Consistency, gocql.LocalQuorum) + assert.Equal(t, gocql.LocalQuorum, cl.Consistency) assert.NotNil(t, cl.PoolConfig.HostSelectionPolicy, "local_dc") require.IsType(t, gocql.PasswordAuthenticator{}, cl.Authenticator) auth := cl.Authenticator.(gocql.PasswordAuthenticator) - assert.Equal(t, auth.Username, "username") - assert.Equal(t, auth.Password, "password") + assert.Equal(t, "username", auth.Username) + assert.Equal(t, "password", auth.Password) assert.NotNil(t, cl.SslOpts) assert.True(t, cl.DisableInitialHostLookup) }