Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Commit

Permalink
fix(distributor): Fixed reconnection issue of (re)used ce clients (#7109
Browse files Browse the repository at this point in the history
)

* fixed err message and set reconnection attempts to indefinite

Signed-off-by: warber <[email protected]>

* added unit test

Signed-off-by: warber <[email protected]>

* hardened test

Signed-off-by: warber <[email protected]>
  • Loading branch information
warber authored Mar 11, 2022
1 parent e51d6c9 commit 9b69d64
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 7 deletions.
9 changes: 4 additions & 5 deletions distributor/pkg/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/keptn/go-utils/pkg/lib/v0_2_0"
"github.com/keptn/keptn/distributor/pkg/config"
"github.com/keptn/keptn/distributor/pkg/utils"
"github.com/nats-io/nats.go"
logger "github.com/sirupsen/logrus"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -128,7 +129,7 @@ func (f *Forwarder) forwardEventToNATSServer(event cloudevents.Event) error {
cloudevents.WithEncodingStructured(context.Background())

if result := c.Send(context.Background(), event); cloudevents.IsUndelivered(result) {
logger.Errorf("Failed to send cloud event: %v", err)
logger.Errorf("Failed to send cloud event: %v", result.Error())
} else {
logger.Infof("Sent: %s, accepted: %t", event.ID(), cloudevents.IsACK(result))
}
Expand All @@ -149,18 +150,16 @@ func (f *Forwarder) forwardEventToAPI(event cloudevents.Event) error {

func (f *Forwarder) createPubSubConnection(topic string) (*cenats.Sender, error) {
if topic == "" {
return nil, errors.New("no" +
" PubSub Topic defined")
return nil, errors.New("no PubSub Topic defined")
}

if f.pubSubConnections[topic] == nil {
p, err := cenats.NewSender(f.env.PubSubURL, topic, cenats.NatsOptions())
p, err := cenats.NewSender(f.env.PubSubURL, topic, cenats.NatsOptions(nats.MaxReconnects(-1)))
if err != nil {
logger.Errorf("Failed to create nats protocol, %v", err)
}
f.pubSubConnections[topic] = p
}

return f.pubSubConnections[topic], nil
}

Expand Down
97 changes: 96 additions & 1 deletion distributor/pkg/forwarder/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,94 @@ const taskFinishedEvent = `{
"id": "5de83495-4f83-481c-8dbe-fcceb2e0243b",
"source": "my-service",
"specversion": "1.0",
"type": "sh.keptn.events.task.fnished",
"type": "sh.keptn.events.task.finished",
"shkeptncontext": "c9ffbbb-6e1d-4789-9fee-6e63b4bcc1fb"
}`

/**
Testing whether the (re)used client connection of a topic is surviving a NATS outage
*/
func Test_NATSDown(t *testing.T) {
const natsTestPort = 8369
event1Received := false
event2Received := false

svr, shutdownNats := runNATSServerOnPort(natsTestPort)
defer shutdownNats()

cfg := config.EnvConfig{}
envconfig.Process("", &cfg)
cfg.PubSubURL = svr.Addr().String()

natsClient, err := nats.Connect(svr.Addr().String())
if err != nil {
t.Errorf("could not initialize nats client: %s", err.Error())
}
defer natsClient.Close()
_, _ = natsClient.Subscribe("sh.keptn.events.task.*", func(m *nats.Msg) {
if m.Subject == "sh.keptn.events.task.started" {
event1Received = true
}
if m.Subject == "sh.keptn.events.task.finished" {
event2Received = true
}
})

apiset, _ := keptnapi.New(config.DefaultShipyardControllerBaseURL)
f := &Forwarder{
EventChannel: make(chan cloudevents.Event),
keptnEventAPI: apiset.APIV1(),
httpClient: &http.Client{},
pubSubConnections: map[string]*cenats.Sender{},
env: cfg,
}

ctx, cancel := context.WithCancel(context.Background())
executionContext := utils.NewExecutionContext(ctx, 1)
go f.Start(executionContext)
time.Sleep(2 * time.Second)

// send events to forwarder
eventFromService(taskStartedEvent)
eventFromService(taskFinishedEvent)

assert.Eventually(t, func() bool { return event1Received && event2Received }, time.Second*time.Duration(10), time.Second)

// change the max reconnect attempts from indefinite (-1) to 1 to indirectly
// test what would happen if we use a connection which is stale/not usable anymore
// A bit hacky, but it tests the behavior of the cloudevents library.
f.pubSubConnections["sh.keptn.events.task.started"].Conn.Opts.MaxReconnect = 1
f.pubSubConnections["sh.keptn.events.task.started"].Conn.Opts.ReconnectWait = 10 * time.Millisecond

// shutdown the embedded NATS cluster
shutdownNats()
svr.WaitForShutdown()

// wait until we exceed max reconnect time
time.Sleep(100 * time.Millisecond)

// restart embedded NATS cluster
svr, shutdownNats = runNATSServerOnPort(natsTestPort)
defer shutdownNats()
time.Sleep(2 * time.Second)

// reset flags for checking event reception
event1Received = false
event2Received = false

// send events to forwarder
eventFromService(taskStartedEvent)
eventFromService(taskFinishedEvent)

// check if this time event2 was received because the used connection did a reconnection
// whereas event1 was not received because the reconnection did not occur
assert.Eventually(t, func() bool { return !event1Received && event2Received }, time.Second*time.Duration(10), time.Second)

cancel()
executionContext.Wg.Wait()

}

func Test_ForwardEventsToNATS(t *testing.T) {
expectedReceivedMessageCount := 0

Expand Down Expand Up @@ -165,6 +249,17 @@ func eventFromService(event string) {
http.Post(fmt.Sprintf("http://127.0.0.1:%d/event", 8081), "application/cloudevents+json", payload)
}

func runNATSServerOnPort(port int) (*server.Server, func()) {
opts := natsserver.DefaultTestOptions
opts.Port = port
svr := runNatsWithOptions(&opts)
return svr, func() { svr.Shutdown() }

}
func runNatsWithOptions(opts *server.Options) *server.Server {
return natsserver.RunServer(opts)
}

func runNATSServer() (*server.Server, func()) {
svr := natsserver.RunRandClientPortServer()
return svr, func() { svr.Shutdown() }
Expand Down
2 changes: 1 addition & 1 deletion distributor/pkg/receiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func Test_ReceiveFromNATSAndForwardEventForOverlappingSubscriptions(t *testing.T
}

func Test_ReceiveFromNATS_AfterReconnecting(t *testing.T) {
const natsTestPort = 8369
const natsTestPort = 8370
natsURL := fmt.Sprintf("nats://127.0.0.1:%d", natsTestPort)
_, shutdownNats := runNATSServerOnPort(natsTestPort)
defer shutdownNats()
Expand Down

0 comments on commit 9b69d64

Please sign in to comment.