From 2d41bfcea5187c75e5b1a3f3263cf2f8cde9db82 Mon Sep 17 00:00:00 2001 From: Dave Tucker Date: Fri, 20 Aug 2021 09:38:42 +0100 Subject: [PATCH] client: Clean up RPC client on failed connect This issue was caught debugging reconnect failures. We attempt to reconnect as soon as the server disconnects, which allows us to open a socket, but there is no reply (listdbs fails) at this point, when we retry again, the rpcClient is not nil, so we assume we're already connected and don't return an error. This commit ensures that the rpcClient is set to nil if we fail to fully connect and adjusts our tests to perform a container restart instead of simply disconnecting the client to test the reconnect behaviour. Signed-off-by: Dave Tucker --- client/client.go | 7 +++++++ test/ovs/ovs_integration_test.go | 19 +++++++++++-------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/client/client.go b/client/client.go index d1b214ff..f5b3b910 100644 --- a/client/client.go +++ b/client/client.go @@ -145,6 +145,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { // FIXME: This only emits the error from the last attempted connection return fmt.Errorf("failed to connect to endpoints %q: %v", o.options.endpoints, err) } + if err := o.createRPC2Client(c); err != nil { return err } @@ -152,6 +153,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { dbs, err := o.listDbs(ctx) if err != nil { o.rpcClient.Close() + o.rpcClient = nil return err } @@ -164,6 +166,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { } if !found { o.rpcClient.Close() + o.rpcClient = nil return fmt.Errorf("target database not found") } @@ -180,8 +183,10 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { if err != nil { o.rpcClient.Close() + o.rpcClient = nil return err } + o.schemaMutex.Lock() o.schema = schema o.schemaMutex.Unlock() @@ -192,6 +197,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { o.api = newAPI(o.cache) } else { o.rpcClient.Close() + o.rpcClient = nil return err } o.cacheMutex.Unlock() @@ -208,6 +214,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { err = o.monitor(ctx, id, reconnect, request...) if err != nil { o.rpcClient.Close() + o.rpcClient = nil return err } } diff --git a/test/ovs/ovs_integration_test.go b/test/ovs/ovs_integration_test.go index 6d5aa2fd..0b69b475 100644 --- a/test/ovs/ovs_integration_test.go +++ b/test/ovs/ovs_integration_test.go @@ -2,7 +2,6 @@ package ovs import ( "context" - "fmt" "os" "reflect" "strings" @@ -43,7 +42,10 @@ func (suite *OVSIntegrationSuite) SetupSuite() { Repository: "libovsdb/ovs", Tag: tag, ExposedPorts: []string{"6640/tcp"}, - Tty: true, + PortBindings: map[docker.Port][]docker.PortBinding{ + "6640/tcp": {{HostPort: "56640"}}, + }, + Tty: true, } hostConfig := func(config *docker.HostConfig) { // set AutoRemove to true so that stopped container goes away by itself @@ -56,8 +58,8 @@ func (suite *OVSIntegrationSuite) SetupSuite() { suite.resource, err = suite.pool.RunWithOptions(options, hostConfig) require.NoError(suite.T(), err) - // set expiry to 60 seconds so containers are cleaned up on test panic - err = suite.resource.Expire(60) + // set expiry to 90 seconds so containers are cleaned up on test panic + err = suite.resource.Expire(90) require.NoError(suite.T(), err) // let the container start before we attempt connection @@ -66,7 +68,7 @@ func (suite *OVSIntegrationSuite) SetupSuite() { err = suite.pool.Retry(func() error { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - endpoint := fmt.Sprintf("tcp::%s", suite.resource.GetPort("6640/tcp")) + endpoint := "tcp::56640" ovs, err := client.NewOVSDBClient( defDB, client.WithEndpoint(endpoint), @@ -230,7 +232,7 @@ func (suite *OVSIntegrationSuite) TestWithReconnect() { // Reconfigure err = suite.client.SetOption( - client.WithReconnect(2*time.Second, backoff.NewExponentialBackOff()), + client.WithReconnect(500*time.Millisecond, &backoff.ZeroBackOff{}), ) require.NoError(suite.T(), err) @@ -278,12 +280,13 @@ func (suite *OVSIntegrationSuite) TestWithReconnect() { require.Equal(suite.T(), bridgeName, br.Name) // trigger reconnect - suite.client.Disconnect() + err = suite.pool.Client.RestartContainer(suite.resource.Container.ID, 0) + require.NoError(suite.T(), err) // check that we are automatically reconnected require.Eventually(suite.T(), func() bool { return suite.client.Connected() - }, 2*time.Second, 500*time.Millisecond) + }, 20*time.Second, 1*time.Second) err = suite.client.Echo(context.TODO()) require.NoError(suite.T(), err)