Skip to content

Commit

Permalink
Consolidate apiHost & host for RabbitMQ scaler
Browse files Browse the repository at this point in the history
closes kedacore#711

Signed-off-by: Tomek Urbaszek <[email protected]>
  • Loading branch information
turbaszek committed Sep 9, 2020
1 parent 82a4f89 commit 2330458
Showing 1 changed file with 30 additions and 39 deletions.
69 changes: 30 additions & 39 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ const (
rabbitQueueLengthMetricName = "queueLength"
defaultRabbitMQQueueLength = 20
rabbitMetricType = "External"
rabbitIncludeUnacked = "includeUnacked"
defaultIncludeUnacked = false
)

const (
httpProtocol = "http"
amqpProtocol = "amqp"
defaultProtocol = amqpProtocol
)

type rabbitMQScaler struct {
Expand All @@ -34,11 +38,10 @@ type rabbitMQScaler struct {
}

type rabbitMQMetadata struct {
queueName string
host string // connection string for AMQP protocol
apiHost string // connection string for management API requests
queueLength int
includeUnacked bool // if true uses HTTP API and requires apiHost, if false uses AMQP and requires host
queueName string
host string // connection string for either HTTP or AMQP protocol
queueLength int
protocol string // either http or amqp protocol
}

type queueInfo struct {
Expand All @@ -56,7 +59,7 @@ func NewRabbitMQScaler(resolvedEnv, metadata, authParams map[string]string) (Sca
return nil, fmt.Errorf("error parsing rabbitmq metadata: %s", err)
}

if meta.includeUnacked {
if meta.protocol == httpProtocol {
return &rabbitMQScaler{metadata: meta}, nil
}

Expand All @@ -75,47 +78,35 @@ func NewRabbitMQScaler(resolvedEnv, metadata, authParams map[string]string) (Sca
func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string) (*rabbitMQMetadata, error) {
meta := rabbitMQMetadata{}

meta.includeUnacked = defaultIncludeUnacked
if val, ok := metadata[rabbitIncludeUnacked]; ok {
includeUnacked, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("includeUnacked parsing error %s", err.Error())
// Resolve protocol type
meta.protocol = defaultProtocol
if val, ok := metadata["protocol"]; ok {
if val == amqpProtocol || val == httpProtocol {
meta.protocol = val
} else {
return nil, fmt.Errorf("the protocol has to be either `%s` or `%s` but is `%s`", amqpProtocol, httpProtocol, val)
}
meta.includeUnacked = includeUnacked
}

if meta.includeUnacked {
if authParams["apiHost"] != "" {
meta.apiHost = authParams["apiHost"]
} else if metadata["apiHost"] != "" {
meta.apiHost = metadata["apiHost"]
} else if metadata["apiHostFromEnv"] != "" {
meta.apiHost = resolvedEnv[metadata["apiHostFromEnv"]]
}

if meta.apiHost == "" {
return nil, fmt.Errorf("no apiHost setting given")
}
// Resolve host value
if authParams["host"] != "" {
meta.host = authParams["host"]
} else if metadata["host"] != "" {
meta.host = metadata["host"]
} else if metadata["hostFromEnv"] != "" {
meta.host = resolvedEnv[metadata["hostFromEnv"]]
} else {
if authParams["host"] != "" {
meta.host = authParams["host"]
} else if metadata["host"] != "" {
meta.host = metadata["host"]
} else if metadata["hostFromEnv"] != "" {
meta.host = resolvedEnv[metadata["hostFromEnv"]]
}

if meta.host == "" {
return nil, fmt.Errorf("no host setting given")
}
return nil, fmt.Errorf("no host setting given")
}

// Resolve queueName
if val, ok := metadata["queueName"]; ok {
meta.queueName = val
} else {
return nil, fmt.Errorf("no queue name given")
}

// Resolve queueLength
if val, ok := metadata[rabbitQueueLengthMetricName]; ok {
queueLength, err := strconv.Atoi(val)
if err != nil {
Expand Down Expand Up @@ -167,7 +158,7 @@ func (s *rabbitMQScaler) IsActive(ctx context.Context) (bool, error) {
}

func (s *rabbitMQScaler) getQueueMessages() (int, error) {
if s.metadata.includeUnacked {
if s.metadata.protocol == httpProtocol {
info, err := s.getQueueInfoViaHTTP()
if err != nil {
return -1, err
Expand Down Expand Up @@ -202,7 +193,7 @@ func getJSON(url string, target interface{}) error {
}

func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
parsedURL, err := url.Parse(s.metadata.apiHost)
parsedURL, err := url.Parse(s.metadata.host)

if err != nil {
return nil, err
Expand Down

0 comments on commit 2330458

Please sign in to comment.