Skip to content

Commit

Permalink
Add support when JetStream cluster not on kubernetes
Browse files Browse the repository at this point in the history
Signed-off-by: Muhammad Fadhlika <[email protected]>
  • Loading branch information
mfadhlika committed Mar 2, 2023
1 parent 1a90325 commit d678015
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Here is an overview of all new **experimental** features:
- **Datadog Scaler**: Return correct error when getting a 429 error ([#4187](https://github.com/kedacore/keda/issues/4187))
- **Kafka Scaler**: Return error if the processing of the partition lag fails ([#4098](https://github.com/kedacore/keda/issues/4098))
- **Kafka Scaler**: Support 0 in activationLagThreshold configuration ([#4137](https://github.com/kedacore/keda/issues/4137))
- **NATS Jetstream Scaler:** Fix compatibility when cluster not on kubernetes ([#4101](https://github.com/kedacore/keda/issues/4101))
- **Prometheus Metrics**: Expose Prometheus Metrics also when getting ScaledObject state ([#4075](https://github.com/kedacore/keda/issues/4075))
- **Redis Scalers**: Fix panic produced by incorrect logger initialization ([#4197](https://github.com/kedacore/keda/issues/4197))

Expand Down
98 changes: 66 additions & 32 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
Expand Down Expand Up @@ -52,12 +53,8 @@ type jetStreamEndpointResponse struct {
}

type jetStreamServerEndpointResponse struct {
Cluster jetStreamCluster `json:"cluster"`
ServerName string `json:"server_name"`
}

type jetStreamCluster struct {
HostUrls []string `json:"urls"`
ConnectUrls []string `json:"connect_urls"`
ServerName string `json:"server_name"`
}

type accountDetail struct {
Expand Down Expand Up @@ -214,12 +211,9 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context
}

if s.metadata.clusterSize > 1 {
// we know who the consumer leader is, query it directly
if s.metadata.consumerLeader != "" {
natsJetStreamMonitoringLeaderURL, err := s.getNATSJetStreamMonitoringNodeURL(s.metadata.consumerLeader)
if err != nil {
return err
}
// we know who the consumer leader and its monitoring url is, query it directly
if s.metadata.consumerLeader != "" && s.metadata.monitoringLeaderURL != "" {
natsJetStreamMonitoringLeaderURL := s.metadata.monitoringLeaderURL

jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringLeaderURL)
if err != nil {
Expand All @@ -231,32 +225,35 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context
}

// we haven't found the consumer yet, grab the list of hosts and try each one
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL()
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL("")
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringServerURL, nil)
jetStreamServerResp, err := s.getNATSJetstreamServerInfo(ctx, natsJetStreamMonitoringServerURL)
if err != nil {
return err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring server endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringServerURL)
return err
}
for _, clusterURL := range jetStreamServerResp.ConnectUrls {
// get hostname from the url
// nats-1.nats.svc.cluster.local:4221 -> nats-1.nats.svc.cluster.local, or
// 172.0.1.3:4221 -> 172.0.1.3
nodeHostname := strings.Split(clusterURL, ":")[0]
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL(nodeHostname)
if err != nil {
return err
}

defer resp.Body.Close()
var jetStreamServerResp *jetStreamServerEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jetStreamServerResp); err != nil {
s.logger.Error(err, "unable to decode NATS JetStream server details")
return err
}
// Query server info to get its name
jetStreamServerResp, err := s.getNATSJetstreamServerInfo(ctx, natsJetStreamMonitoringServerURL)
if err != nil {
return err
}

node := jetStreamServerResp.ServerName

for _, clusterURL := range jetStreamServerResp.Cluster.HostUrls {
node := strings.Split(clusterURL, ".")[0]
natsJetStreamMonitoringNodeURL, err := s.getNATSJetStreamMonitoringNodeURL(node)
natsJetStreamMonitoringNodeURL, err := s.getNATSJetStreamMonitoringNodeURL(nodeHostname)
if err != nil {
return err
}
Expand Down Expand Up @@ -320,6 +317,28 @@ func (s *natsJetStreamScaler) invalidateNATSJetStreamCachedMonitoringData() {
s.stream = nil
}

func (s *natsJetStreamScaler) getNATSJetstreamServerInfo(ctx context.Context, natsJetStreamMonitoringServerURL string) (*jetStreamServerEndpointResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringServerURL, nil)
if err != nil {
return nil, err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring server endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringServerURL)
return nil, err
}

defer resp.Body.Close()
var jetStreamServerResp *jetStreamServerEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jetStreamServerResp); err != nil {
s.logger.Error(err, "unable to decode NATS JetStream server details")
return nil, err
}

return jetStreamServerResp, nil
}

func (s *natsJetStreamScaler) getNATSJetstreamMonitoringRequest(ctx context.Context, natsJetStreamMonitoringURL string) (*jetStreamEndpointResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringURL, nil)
if err != nil {
Expand Down Expand Up @@ -349,22 +368,37 @@ func getNATSJetStreamMonitoringURL(useHTTPS bool, natsServerEndpoint string, acc
return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", scheme, natsServerEndpoint, account)
}

func (s *natsJetStreamScaler) getNATSJetStreamMonitoringServerURL() (string, error) {
func (s *natsJetStreamScaler) getNATSJetStreamMonitoringServerURL(nodeHostname string) (string, error) {
jsURL, err := url.Parse(s.metadata.monitoringURL)
if err != nil {
s.logger.Error(err, "unable to parse monitoring URL to create server URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
return "", err
}
return fmt.Sprintf("%s://%s/varz", jsURL.Scheme, jsURL.Host), nil

host := jsURL.Host
if nodeHostname != "" {
host = nodeHostname

if port := jsURL.Port(); port != "" {
host = net.JoinHostPort(host, port)
}
}

return fmt.Sprintf("%s://%s/varz", jsURL.Scheme, host), nil
}

func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(node string) (string, error) {
func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(nodeHostname string) (string, error) {
jsURL, err := url.Parse(s.metadata.monitoringURL)
if err != nil {
s.logger.Error(err, "unable to parse monitoring URL to create node URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
return "", err
}
return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, node, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil

if port := jsURL.Port(); port != "" {
nodeHostname = net.JoinHostPort(nodeHostname, port)
}

return fmt.Sprintf("%s://%s%s?%s", jsURL.Scheme, nodeHostname, jsURL.Path, jsURL.RawQuery), nil
}

func (s *natsJetStreamScaler) getMaxMsgLag() int64 {
Expand Down
17 changes: 1 addition & 16 deletions pkg/scalers/nats_jetstream_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,6 @@ var testNATSJetStreamMockResponses = []parseNATSJetStreamMockResponsesTestData{
}},
}},
}, true, false},
{
"Fail - Bad leader name (clustered)",
&natsJetStreamMetricIdentifier{
&parseNATSJetStreamMetadataTestData{
testNATSJetStreamGoodMetadata, map[string]string{}, false},
0, "s0-nats-jetstream-mystream",
},
&jetStreamEndpointResponse{
MetaCluster: metaCluster{ClusterSize: 3},
Accounts: []accountDetail{{Name: "$G",
Streams: []*streamDetail{{Name: "mystream",
Consumers: []consumerDetail{{Name: "pull_consumer", NumPending: 100, Cluster: consumerCluster{Leader: "leaderBad!!!!"}}},
}},
}},
}, false, true},
{
"Not Active - consumer missing - connected to node without consumer info (clustered)",
&natsJetStreamMetricIdentifier{
Expand Down Expand Up @@ -431,7 +416,7 @@ func TestNATSJetStreamGetNATSJetstreamServerURL(t *testing.T) {

mockJetStreamScaler.metadata.monitoringURL = "234234:::::34234234;;;;really_bad_URL;;/"

_, err = mockJetStreamScaler.getNATSJetStreamMonitoringServerURL()
_, err = mockJetStreamScaler.getNATSJetStreamMonitoringServerURL("")
if err == nil {
t.Error("Expected error for parsing monitoring server URL but got success")
}
Expand Down

0 comments on commit d678015

Please sign in to comment.