diff --git a/client/client.go b/client/client.go index d1b214ff..f5b3b910 100644 --- a/client/client.go +++ b/client/client.go @@ -145,6 +145,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { // FIXME: This only emits the error from the last attempted connection return fmt.Errorf("failed to connect to endpoints %q: %v", o.options.endpoints, err) } + if err := o.createRPC2Client(c); err != nil { return err } @@ -152,6 +153,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { dbs, err := o.listDbs(ctx) if err != nil { o.rpcClient.Close() + o.rpcClient = nil return err } @@ -164,6 +166,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { } if !found { o.rpcClient.Close() + o.rpcClient = nil return fmt.Errorf("target database not found") } @@ -180,8 +183,10 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { if err != nil { o.rpcClient.Close() + o.rpcClient = nil return err } + o.schemaMutex.Lock() o.schema = schema o.schemaMutex.Unlock() @@ -192,6 +197,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { o.api = newAPI(o.cache) } else { o.rpcClient.Close() + o.rpcClient = nil return err } o.cacheMutex.Unlock() @@ -208,6 +214,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { err = o.monitor(ctx, id, reconnect, request...) if err != nil { o.rpcClient.Close() + o.rpcClient = nil return err } } diff --git a/test/ovs/ovs_integration_test.go b/test/ovs/ovs_integration_test.go index 6d5aa2fd..0b69b475 100644 --- a/test/ovs/ovs_integration_test.go +++ b/test/ovs/ovs_integration_test.go @@ -2,7 +2,6 @@ package ovs import ( "context" - "fmt" "os" "reflect" "strings" @@ -43,7 +42,10 @@ func (suite *OVSIntegrationSuite) SetupSuite() { Repository: "libovsdb/ovs", Tag: tag, ExposedPorts: []string{"6640/tcp"}, - Tty: true, + PortBindings: map[docker.Port][]docker.PortBinding{ + "6640/tcp": {{HostPort: "56640"}}, + }, + Tty: true, } hostConfig := func(config *docker.HostConfig) { // set AutoRemove to true so that stopped container goes away by itself @@ -56,8 +58,8 @@ func (suite *OVSIntegrationSuite) SetupSuite() { suite.resource, err = suite.pool.RunWithOptions(options, hostConfig) require.NoError(suite.T(), err) - // set expiry to 60 seconds so containers are cleaned up on test panic - err = suite.resource.Expire(60) + // set expiry to 90 seconds so containers are cleaned up on test panic + err = suite.resource.Expire(90) require.NoError(suite.T(), err) // let the container start before we attempt connection @@ -66,7 +68,7 @@ func (suite *OVSIntegrationSuite) SetupSuite() { err = suite.pool.Retry(func() error { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - endpoint := fmt.Sprintf("tcp::%s", suite.resource.GetPort("6640/tcp")) + endpoint := "tcp::56640" ovs, err := client.NewOVSDBClient( defDB, client.WithEndpoint(endpoint), @@ -230,7 +232,7 @@ func (suite *OVSIntegrationSuite) TestWithReconnect() { // Reconfigure err = suite.client.SetOption( - client.WithReconnect(2*time.Second, backoff.NewExponentialBackOff()), + client.WithReconnect(500*time.Millisecond, &backoff.ZeroBackOff{}), ) require.NoError(suite.T(), err) @@ -278,12 +280,13 @@ func (suite *OVSIntegrationSuite) TestWithReconnect() { require.Equal(suite.T(), bridgeName, br.Name) // trigger reconnect - suite.client.Disconnect() + err = suite.pool.Client.RestartContainer(suite.resource.Container.ID, 0) + require.NoError(suite.T(), err) // check that we are automatically reconnected require.Eventually(suite.T(), func() bool { return suite.client.Connected() - }, 2*time.Second, 500*time.Millisecond) + }, 20*time.Second, 1*time.Second) err = suite.client.Echo(context.TODO()) require.NoError(suite.T(), err)