Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change how number of pending messages is calculated and add more error handling. #533

Merged
merged 12 commits into from
Jan 7, 2020
53 changes: 43 additions & 10 deletions pkg/scalers/stan_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,30 +108,51 @@ func parseStanMetadata(metadata map[string]string) (stanMetadata, error) {

// IsActive determines if we need to scale from zero
func (s *stanScaler) IsActive(ctx context.Context) (bool, error) {
resp, err := http.Get(s.getMonitoringEndpoint())
monitoringEndpoint := s.getMonitoringEndpoint()

resp, err := http.Get(monitoringEndpoint)
if err != nil {
stanLog.Error(err, "Unable to access the nats streaming broker monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.natsServerMonitoringEndpoint)
return false, err
}

if resp.StatusCode == 404 {
baseResp, _ := http.Get(s.getSTANChannelsEndpoint())

if baseResp.StatusCode == 404 {
stanLog.Info("Streaming broker endpoint returned 404. Please ensure it has been created", "url", monitoringEndpoint, "channelName", s.metadata.subject)

} else {
stanLog.Info("Unable to connect to STAN. Please ensure you have configured the ScaledObject with the correct endpoint.", "baseResp.StatusCode", baseResp.StatusCode, "natsServerMonitoringEndpoint", s.metadata.natsServerMonitoringEndpoint)
}

return false, err
}

defer resp.Body.Close()
json.NewDecoder(resp.Body).Decode(&s.channelInfo)

return s.hasPendingMessage() || s.getMaxMsgLag() > 0, nil
}

func (s *stanScaler) getSTANChannelsEndpoint() string {
return "http://" + s.metadata.natsServerMonitoringEndpoint + "/streaming/channelsz"
}

func (s *stanScaler) getMonitoringEndpoint() string {
return "http://" + s.metadata.natsServerMonitoringEndpoint + "/streaming/channelsz?" + "channel=" + s.metadata.subject + "&subs=1"
return s.getSTANChannelsEndpoint() + "?channel=" + s.metadata.subject + "&subs=1"
}

func (s *stanScaler) getTotalMessages() int64 {
return s.channelInfo.MsgCount
return s.channelInfo.LastSequence
cwoolum marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *stanScaler) getMaxMsgLag() int64 {
var maxValue int64
maxValue = 0
maxValue := int64(0)
combinedQueueName := s.metadata.durableName + ":" + s.metadata.queueGroup

for _, subs := range s.channelInfo.Subscriber {
if subs.LastSent > maxValue && subs.QueueName == (s.metadata.durableName+":"+s.metadata.queueGroup) {
if subs.LastSent > maxValue && subs.QueueName == combinedQueueName {
maxValue = subs.LastSent
}
}
cwoolum marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -140,14 +161,26 @@ func (s *stanScaler) getMaxMsgLag() int64 {
}

func (s *stanScaler) hasPendingMessage() bool {
var hasPending bool
hasPending = false
hasPending := false
subscriberFound := false
combinedQueueName := s.metadata.durableName + ":" + s.metadata.queueGroup

for _, subs := range s.channelInfo.Subscriber {
if subs.PendingCount > 0 && subs.QueueName == (s.metadata.durableName+":"+s.metadata.queueGroup) {
hasPending = true
if subs.QueueName == combinedQueueName {
subscriberFound = true

if subs.PendingCount > 0 {
hasPending = true
}

break
}
}

if !subscriberFound {
stanLog.Info("The STAN subscription was not found.", "combinedQueueName", combinedQueueName)
}

return hasPending
}

Expand Down