From 353be65d240a6e5129d9d2cb4fd10f1434489aca Mon Sep 17 00:00:00 2001 From: Eric Chlebek Date: Mon, 12 Feb 2024 09:25:44 -0800 Subject: [PATCH 1/7] Follow HTTP redirects after failed WS dials This commit allows the opamp client to follow redirects after websocket handshake failures. Redirect following is not implemented by gorilla/websocket, but can be handled by inspecting the returned response object for 3xx status and Location header. --- client/wsclient.go | 24 +++++++++++++++++--- client/wsclient_test.go | 50 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 3 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index 268ffddd..805b56d8 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -29,7 +29,7 @@ type wsClient struct { requestHeader http.Header // Websocket dialer and connection. - dialer websocket.Dialer + dialer *websocket.Dialer conn *websocket.Conn connMutex sync.RWMutex @@ -57,7 +57,7 @@ func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) erro } // Prepare connection settings. - c.dialer = *websocket.DefaultDialer + c.dialer = websocket.DefaultDialer var err error c.url, err = url.Parse(settings.OpAMPServerURL) @@ -131,8 +131,26 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh c.common.Callbacks.OnConnectFailed(ctx, err) } if resp != nil { - c.common.Logger.Errorf(ctx, "Server responded with status=%v", resp.Status) duration := sharedinternal.ExtractRetryAfterHeader(resp) + if resp.StatusCode >= 300 && resp.StatusCode < 400 { + // very liberal handling of 3xx that largely ignores HTTP semantics + redirect, err := resp.Location() + if err != nil { + c.common.Logger.Errorf(ctx, "3xx redirect, but no valid location: %s", err) + return err, duration + } + if redirect.Scheme == "http" || redirect.Scheme == "" { + redirect.Scheme = "ws" + } else if redirect.Scheme == "https" { + redirect.Scheme = "wss" + } + c.common.Logger.Debugf(ctx, "%d redirect to %s", resp.StatusCode, redirect) + // Set the URL to the redirect, so that it connects to it on the + // next cycle. + c.url = redirect + } else { + c.common.Logger.Errorf(ctx, "Server responded with status=%v", resp.Status) + } return err, duration } return err, sharedinternal.OptionalDuration{Defined: false} diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 7696e9e6..70aea811 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -3,6 +3,9 @@ package client import ( "context" "fmt" + "net/http" + "net/http/httptest" + "net/url" "strings" "sync/atomic" "testing" @@ -177,3 +180,50 @@ func TestVerifyWSCompress(t *testing.T) { }) } } + +func redirectServer(to string) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + http.Redirect(w, req, to, http.StatusSeeOther) + })) +} + +func TestRedirectWS(t *testing.T) { + redirectee := internal.StartMockServer(t) + redirector := redirectServer("ws://" + redirectee.Endpoint) + defer redirector.Close() + + var conn atomic.Value + redirectee.OnWSConnect = func(c *websocket.Conn) { + conn.Store(c) + } + + // Start an OpAMP/WebSocket client. + var connected int64 + var connectErr atomic.Value + settings := types.StartSettings{ + Callbacks: types.CallbacksStruct{ + OnConnectFunc: func(ctx context.Context) { + atomic.StoreInt64(&connected, 1) + }, + OnConnectFailedFunc: func(ctx context.Context, err error) { + if err != websocket.ErrBadHandshake { + connectErr.Store(err) + } + }, + }, + } + reURL, err := url.Parse(redirector.URL) + assert.NoError(t, err) + reURL.Scheme = "ws" + settings.OpAMPServerURL = reURL.String() + client := NewWebSocket(nil) + startClient(t, settings, client) + + // Wait for connection to be established. + eventually(t, func() bool { return conn.Load() != nil }) + assert.True(t, connectErr.Load() == nil) + + // Stop the client. + err = client.Stop(context.Background()) + assert.NoError(t, err) +} From 7b2dd067b9417b04dcd5cb7b17384cb993e2a161 Mon Sep 17 00:00:00 2001 From: Eric Chlebek Date: Tue, 13 Feb 2024 09:42:57 -0800 Subject: [PATCH 2/7] Copy default dialer again --- client/wsclient.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index 805b56d8..12077c0e 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -29,7 +29,7 @@ type wsClient struct { requestHeader http.Header // Websocket dialer and connection. - dialer *websocket.Dialer + dialer websocket.Dialer conn *websocket.Conn connMutex sync.RWMutex @@ -57,7 +57,7 @@ func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) erro } // Prepare connection settings. - c.dialer = websocket.DefaultDialer + c.dialer = *websocket.DefaultDialer var err error c.url, err = url.Parse(settings.OpAMPServerURL) From 0db7a24eed07942d6978595fa703e36aea41c133 Mon Sep 17 00:00:00 2001 From: Eric Chlebek Date: Tue, 13 Feb 2024 10:22:25 -0800 Subject: [PATCH 3/7] Improve error message somewhat --- client/wsclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/wsclient.go b/client/wsclient.go index 12077c0e..901f51e0 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -136,7 +136,7 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh // very liberal handling of 3xx that largely ignores HTTP semantics redirect, err := resp.Location() if err != nil { - c.common.Logger.Errorf(ctx, "3xx redirect, but no valid location: %s", err) + c.common.Logger.Errorf(ctx, "%d redirect, but no valid location: %s", resp.StatusCode, err) return err, duration } if redirect.Scheme == "http" || redirect.Scheme == "" { From bd590afd5af5a37a6e77ea5017a92dd68ccc5bfe Mon Sep 17 00:00:00 2001 From: Eric Chlebek Date: Tue, 13 Feb 2024 11:18:30 -0800 Subject: [PATCH 4/7] Force zero duration delay in retry loop --- client/wsclient.go | 5 +++++ internal/retryafter.go | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/client/wsclient.go b/client/wsclient.go index 901f51e0..44c4c92a 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -3,6 +3,7 @@ package client import ( "context" "errors" + "fmt" "net/http" "net/url" "sync" @@ -136,6 +137,7 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh // very liberal handling of 3xx that largely ignores HTTP semantics redirect, err := resp.Location() if err != nil { + fmt.Println("error in location", err) c.common.Logger.Errorf(ctx, "%d redirect, but no valid location: %s", resp.StatusCode, err) return err, duration } @@ -148,6 +150,9 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh // Set the URL to the redirect, so that it connects to it on the // next cycle. c.url = redirect + + // don't delay in following the redirect + duration = sharedinternal.ZeroDuration } else { c.common.Logger.Errorf(ctx, "Server responded with status=%v", resp.Status) } diff --git a/internal/retryafter.go b/internal/retryafter.go index 784473bd..25f168c7 100644 --- a/internal/retryafter.go +++ b/internal/retryafter.go @@ -17,6 +17,12 @@ type OptionalDuration struct { Defined bool } +// ZeroDuration represents a zero length duration that is defined. +var ZeroDuration = OptionalDuration{ + Duration: 0, + Defined: true, +} + func parseDelaySeconds(s string) (time.Duration, error) { n, err := strconv.Atoi(s) From f11788b2eab17e93c6e5ec9fa4bf89702882b870 Mon Sep 17 00:00:00 2001 From: Eric Chlebek Date: Tue, 13 Feb 2024 11:19:55 -0800 Subject: [PATCH 5/7] Test that http scheme is rewritten as ws *Don't* test for missing schemes, they are not valid according to Request.Location(). Remove handling for missing schemes. --- client/wsclient.go | 3 +- client/wsclient_test.go | 81 ++++++++++++++++++++++++----------------- 2 files changed, 50 insertions(+), 34 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index 44c4c92a..c2e9237c 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -141,7 +141,8 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh c.common.Logger.Errorf(ctx, "%d redirect, but no valid location: %s", resp.StatusCode, err) return err, duration } - if redirect.Scheme == "http" || redirect.Scheme == "" { + // rewrite the scheme for the sake of tolerance + if redirect.Scheme == "http" { redirect.Scheme = "ws" } else if redirect.Scheme == "https" { redirect.Scheme = "wss" diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 70aea811..4e87a3b0 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -189,41 +189,56 @@ func redirectServer(to string) *httptest.Server { func TestRedirectWS(t *testing.T) { redirectee := internal.StartMockServer(t) - redirector := redirectServer("ws://" + redirectee.Endpoint) - defer redirector.Close() - - var conn atomic.Value - redirectee.OnWSConnect = func(c *websocket.Conn) { - conn.Store(c) - } - - // Start an OpAMP/WebSocket client. - var connected int64 - var connectErr atomic.Value - settings := types.StartSettings{ - Callbacks: types.CallbacksStruct{ - OnConnectFunc: func(ctx context.Context) { - atomic.StoreInt64(&connected, 1) - }, - OnConnectFailedFunc: func(ctx context.Context, err error) { - if err != websocket.ErrBadHandshake { - connectErr.Store(err) - } - }, + tests := []struct { + Name string + Redirector *httptest.Server + }{ + { + Name: "redirect ws scheme", + Redirector: redirectServer("ws://" + redirectee.Endpoint), + }, + { + Name: "redirect http scheme", + Redirector: redirectServer("http://" + redirectee.Endpoint), }, } - reURL, err := url.Parse(redirector.URL) - assert.NoError(t, err) - reURL.Scheme = "ws" - settings.OpAMPServerURL = reURL.String() - client := NewWebSocket(nil) - startClient(t, settings, client) - // Wait for connection to be established. - eventually(t, func() bool { return conn.Load() != nil }) - assert.True(t, connectErr.Load() == nil) + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + var conn atomic.Value + redirectee.OnWSConnect = func(c *websocket.Conn) { + conn.Store(c) + } - // Stop the client. - err = client.Stop(context.Background()) - assert.NoError(t, err) + // Start an OpAMP/WebSocket client. + var connected int64 + var connectErr atomic.Value + settings := types.StartSettings{ + Callbacks: types.CallbacksStruct{ + OnConnectFunc: func(ctx context.Context) { + atomic.StoreInt64(&connected, 1) + }, + OnConnectFailedFunc: func(ctx context.Context, err error) { + if err != websocket.ErrBadHandshake { + connectErr.Store(err) + } + }, + }, + } + reURL, err := url.Parse(test.Redirector.URL) + assert.NoError(t, err) + reURL.Scheme = "ws" + settings.OpAMPServerURL = reURL.String() + client := NewWebSocket(nil) + startClient(t, settings, client) + + // Wait for connection to be established. + eventually(t, func() bool { return conn.Load() != nil }) + assert.True(t, connectErr.Load() == nil) + + // Stop the client. + err = client.Stop(context.Background()) + assert.NoError(t, err) + }) + } } From 341966d63a69649a3fabe288b633c63139a17dd2 Mon Sep 17 00:00:00 2001 From: Eric Chlebek Date: Wed, 14 Feb 2024 11:15:48 -0800 Subject: [PATCH 6/7] Improve coverage, revert zero duration change --- client/wsclient.go | 6 +----- client/wsclient_test.go | 28 +++++++++++++++++++++++----- internal/retryafter.go | 6 ------ 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index c2e9237c..8c725dc5 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -3,7 +3,6 @@ package client import ( "context" "errors" - "fmt" "net/http" "net/url" "sync" @@ -137,7 +136,7 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh // very liberal handling of 3xx that largely ignores HTTP semantics redirect, err := resp.Location() if err != nil { - fmt.Println("error in location", err) + c.common.Callbacks.OnConnectFailed(ctx, err) c.common.Logger.Errorf(ctx, "%d redirect, but no valid location: %s", resp.StatusCode, err) return err, duration } @@ -151,9 +150,6 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh // Set the URL to the redirect, so that it connects to it on the // next cycle. c.url = redirect - - // don't delay in following the redirect - duration = sharedinternal.ZeroDuration } else { c.common.Logger.Errorf(ctx, "Server responded with status=%v", resp.Status) } diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 4e87a3b0..f828e059 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -181,25 +181,37 @@ func TestVerifyWSCompress(t *testing.T) { } } -func redirectServer(to string) *httptest.Server { +func redirectServer(to string, status int) *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { http.Redirect(w, req, to, http.StatusSeeOther) })) } +func errServer() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(302) + })) +} + func TestRedirectWS(t *testing.T) { redirectee := internal.StartMockServer(t) tests := []struct { Name string Redirector *httptest.Server + ExpError bool }{ { Name: "redirect ws scheme", - Redirector: redirectServer("ws://" + redirectee.Endpoint), + Redirector: redirectServer("ws://"+redirectee.Endpoint, 302), }, { Name: "redirect http scheme", - Redirector: redirectServer("http://" + redirectee.Endpoint), + Redirector: redirectServer("http://"+redirectee.Endpoint, 302), + }, + { + Name: "missing location header", + Redirector: errServer(), + ExpError: true, }, } @@ -233,8 +245,14 @@ func TestRedirectWS(t *testing.T) { startClient(t, settings, client) // Wait for connection to be established. - eventually(t, func() bool { return conn.Load() != nil }) - assert.True(t, connectErr.Load() == nil) + eventually(t, func() bool { return conn.Load() != nil || connectErr.Load() != nil }) + if test.ExpError { + if connectErr.Load() == nil { + t.Error("expected non-nil error") + } + } else { + assert.True(t, connectErr.Load() == nil) + } // Stop the client. err = client.Stop(context.Background()) diff --git a/internal/retryafter.go b/internal/retryafter.go index 25f168c7..784473bd 100644 --- a/internal/retryafter.go +++ b/internal/retryafter.go @@ -17,12 +17,6 @@ type OptionalDuration struct { Defined bool } -// ZeroDuration represents a zero length duration that is defined. -var ZeroDuration = OptionalDuration{ - Duration: 0, - Defined: true, -} - func parseDelaySeconds(s string) (time.Duration, error) { n, err := strconv.Atoi(s) From f6cc5df2dfe341beda365b75803a42c2c98c5297 Mon Sep 17 00:00:00 2001 From: Eric Chlebek Date: Wed, 21 Feb 2024 12:04:38 -0800 Subject: [PATCH 7/7] Store retry errors in the wsclient --- client/wsclient.go | 7 ++++++- client/wsclient_test.go | 6 ++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index 8c725dc5..db179997 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -6,6 +6,7 @@ import ( "net/http" "net/url" "sync" + "sync/atomic" "time" "github.com/cenkalti/backoff/v4" @@ -35,6 +36,10 @@ type wsClient struct { // The sender is responsible for sending portion of the OpAMP protocol. sender *internal.WSSender + + // last non-nil internal error that was encountered in the conn retry loop, + // currently used only for testing. + lastInternalErr atomic.Pointer[error] } // NewWebSocket creates a new OpAMP Client that uses WebSocket transport. @@ -136,7 +141,6 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh // very liberal handling of 3xx that largely ignores HTTP semantics redirect, err := resp.Location() if err != nil { - c.common.Callbacks.OnConnectFailed(ctx, err) c.common.Logger.Errorf(ctx, "%d redirect, but no valid location: %s", resp.StatusCode, err) return err, duration } @@ -187,6 +191,7 @@ func (c *wsClient) ensureConnected(ctx context.Context) error { case <-timer.C: { if err, retryAfter := c.tryConnectOnce(ctx); err != nil { + c.lastInternalErr.Store(&err) if errors.Is(err, context.Canceled) { c.common.Logger.Debugf(ctx, "Client is stopped, will not try anymore.") return err diff --git a/client/wsclient_test.go b/client/wsclient_test.go index f828e059..d0b8b4e6 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -245,9 +245,11 @@ func TestRedirectWS(t *testing.T) { startClient(t, settings, client) // Wait for connection to be established. - eventually(t, func() bool { return conn.Load() != nil || connectErr.Load() != nil }) + eventually(t, func() bool { + return conn.Load() != nil || connectErr.Load() != nil || client.lastInternalErr.Load() != nil + }) if test.ExpError { - if connectErr.Load() == nil { + if connectErr.Load() == nil && client.lastInternalErr.Load() == nil { t.Error("expected non-nil error") } } else {