Skip to content

Commit

Permalink
Log retried errors (#2613)
Browse files Browse the repository at this point in the history
Co-authored-by: Manuel de la Peña <[email protected]>
  • Loading branch information
ash2k and mdelapenya authored Jul 4, 2024
1 parent d60fc7c commit 9cd7bcb
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 157 deletions.
98 changes: 57 additions & 41 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,20 +889,25 @@ func (p *DockerProvider) BuildImage(ctx context.Context, img ImageBuildInfo) (st

var buildError error
var resp types.ImageBuildResponse
err = backoff.Retry(func() error {
resp, err = p.client.ImageBuild(ctx, buildOptions.Context, buildOptions)
if err != nil {
buildError = errors.Join(buildError, err)
if isPermanentClientError(err) {
return backoff.Permanent(err)
err = backoff.RetryNotify(
func() error {
resp, err = p.client.ImageBuild(ctx, buildOptions.Context, buildOptions)
if err != nil {
buildError = errors.Join(buildError, err)
if isPermanentClientError(err) {
return backoff.Permanent(err)
}
return err
}
Logger.Printf("Failed to build image: %s, will retry", err)
return err
}
defer p.Close()

return nil
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
defer p.Close()

return nil
},
backoff.WithContext(backoff.NewExponentialBackOff(), ctx),
func(err error, duration time.Duration) {
p.Logger.Printf("Failed to build image: %s, will retry", err)
},
)
if err != nil {
return "", errors.Join(buildError, err)
}
Expand Down Expand Up @@ -1015,7 +1020,7 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
}

if modifiedTag != imageName {
Logger.Printf("✍🏼 Replacing image with %s. From: %s to %s\n", is.Description(), imageName, modifiedTag)
p.Logger.Printf("✍🏼 Replacing image with %s. From: %s to %s\n", is.Description(), imageName, modifiedTag)
imageName = modifiedTag
}
}
Expand Down Expand Up @@ -1180,23 +1185,29 @@ func (p *DockerProvider) findContainerByName(ctx context.Context, name string) (
}

func (p *DockerProvider) waitContainerCreation(ctx context.Context, name string) (*types.Container, error) {
var ctr *types.Container
return ctr, backoff.Retry(func() error {
c, err := p.findContainerByName(ctx, name)
if err != nil {
if !errdefs.IsNotFound(err) && isPermanentClientError(err) {
return backoff.Permanent(err)
return backoff.RetryNotifyWithData(
func() (*types.Container, error) {
c, err := p.findContainerByName(ctx, name)
if err != nil {
if !errdefs.IsNotFound(err) && isPermanentClientError(err) {
return nil, backoff.Permanent(err)
}
return nil, err
}
return err
}

if c == nil {
return fmt.Errorf("container %s not found", name)
}

ctr = c
return nil
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
if c == nil {
return nil, errdefs.NotFound(fmt.Errorf("container %s not found", name))
}
return c, nil
},
backoff.WithContext(backoff.NewExponentialBackOff(), ctx),
func(err error, duration time.Duration) {
if errdefs.IsNotFound(err) {
return
}
p.Logger.Printf("Waiting for container. Got an error: %v; Retrying in %d seconds", err, duration/time.Second)
},
)
}

func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (Container, error) {
Expand Down Expand Up @@ -1283,19 +1294,24 @@ func (p *DockerProvider) attemptToPullImage(ctx context.Context, tag string, pul
}

var pull io.ReadCloser
err = backoff.Retry(func() error {
pull, err = p.client.ImagePull(ctx, tag, pullOpt)
if err != nil {
if isPermanentClientError(err) {
return backoff.Permanent(err)
err = backoff.RetryNotify(
func() error {
pull, err = p.client.ImagePull(ctx, tag, pullOpt)
if err != nil {
if isPermanentClientError(err) {
return backoff.Permanent(err)
}
return err
}
Logger.Printf("Failed to pull image: %s, will retry", err)
return err
}
defer p.Close()

return nil
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
defer p.Close()

return nil
},
backoff.WithContext(backoff.NewExponentialBackOff(), ctx),
func(err error, duration time.Duration) {
p.Logger.Printf("Failed to pull image: %s, will retry", err)
},
)
if err != nil {
return err
}
Expand Down
48 changes: 27 additions & 21 deletions lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,33 +215,39 @@ var defaultReadinessHook = func() ContainerLifecycleHooks {
b.MaxElapsedTime = 1 * time.Second
b.MaxInterval = 5 * time.Second

err := backoff.Retry(func() error {
jsonRaw, err := dockerContainer.inspectRawContainer(ctx)
if err != nil {
return err
}
err := backoff.RetryNotify(
func() error {
jsonRaw, err := dockerContainer.inspectRawContainer(ctx)
if err != nil {
return err
}

exposedAndMappedPorts := jsonRaw.NetworkSettings.Ports

for _, exposedPort := range dockerContainer.exposedPorts {
portMap := nat.Port(exposedPort)
// having entries in exposedAndMappedPorts, where the key is the exposed port,
// and the value is the mapped port, means that the port has been already mapped.
if _, ok := exposedAndMappedPorts[portMap]; !ok {
// check if the port is mapped with the protocol (default is TCP)
if !strings.Contains(exposedPort, "/") {
portMap = nat.Port(fmt.Sprintf("%s/tcp", exposedPort))
if _, ok := exposedAndMappedPorts[portMap]; !ok {
exposedAndMappedPorts := jsonRaw.NetworkSettings.Ports

for _, exposedPort := range dockerContainer.exposedPorts {
portMap := nat.Port(exposedPort)
// having entries in exposedAndMappedPorts, where the key is the exposed port,
// and the value is the mapped port, means that the port has been already mapped.
if _, ok := exposedAndMappedPorts[portMap]; !ok {
// check if the port is mapped with the protocol (default is TCP)
if !strings.Contains(exposedPort, "/") {
portMap = nat.Port(fmt.Sprintf("%s/tcp", exposedPort))
if _, ok := exposedAndMappedPorts[portMap]; !ok {
return fmt.Errorf("port %s is not mapped yet", exposedPort)
}
} else {
return fmt.Errorf("port %s is not mapped yet", exposedPort)
}
} else {
return fmt.Errorf("port %s is not mapped yet", exposedPort)
}
}
}

return nil
}, b)
return nil
},
b,
func(err error, duration time.Duration) {
dockerContainer.logger.Printf("All requested ports were not exposed: %v", err)
},
)
if err != nil {
return fmt.Errorf("all exposed ports, %s, were not mapped in 5s: %w", dockerContainer.exposedPorts, err)
}
Expand Down
109 changes: 52 additions & 57 deletions modules/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"path/filepath"
"testing"
"time"

ch "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand Down Expand Up @@ -95,7 +96,7 @@ func TestClickHouseConnectionHost(t *testing.T) {
defer conn.Close()

// perform assertions
data, err := performCRUD(conn)
data, err := performCRUD(t, conn)
require.NoError(t, err)
assert.Len(t, data, 1)
}
Expand Down Expand Up @@ -132,7 +133,7 @@ func TestClickHouseDSN(t *testing.T) {
defer conn.Close()

// perform assertions
data, err := performCRUD(conn)
data, err := performCRUD(t, conn)
require.NoError(t, err)
assert.Len(t, data, 1)
}
Expand Down Expand Up @@ -223,7 +224,7 @@ func TestClickHouseWithConfigFile(t *testing.T) {
defer conn.Close()

// perform assertions
data, err := performCRUD(conn)
data, err := performCRUD(t, conn)
require.NoError(t, err)
assert.Len(t, data, 1)
})
Expand Down Expand Up @@ -287,75 +288,69 @@ func TestClickHouseWithZookeeper(t *testing.T) {
defer conn.Close()

// perform assertions
data, err := performReplicatedCRUD(conn)
data, err := performReplicatedCRUD(t, conn)
require.NoError(t, err)
assert.Len(t, data, 1)
}

func performReplicatedCRUD(conn driver.Conn) ([]Test, error) {
var (
err error
res []Test
)

err = backoff.Retry(func() error {
err = conn.Exec(context.Background(), "CREATE TABLE replicated_test_table (id UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/mdb.data_transfer_cp_cdc', '{replica}') PRIMARY KEY (id) ORDER BY (id) SETTINGS index_granularity = 8192;")
if err != nil {
return err
}

err = conn.Exec(context.Background(), "INSERT INTO replicated_test_table (id) VALUES (1);")
if err != nil {
return err
}

rows, err := conn.Query(context.Background(), "SELECT * FROM replicated_test_table;")
if err != nil {
return err
}
func performReplicatedCRUD(t *testing.T, conn driver.Conn) ([]Test, error) {
return backoff.RetryNotifyWithData(
func() ([]Test, error) {
err := conn.Exec(context.Background(), "CREATE TABLE replicated_test_table (id UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/mdb.data_transfer_cp_cdc', '{replica}') PRIMARY KEY (id) ORDER BY (id) SETTINGS index_granularity = 8192;")
if err != nil {
return nil, err
}

for rows.Next() {
var r Test
err = conn.Exec(context.Background(), "INSERT INTO replicated_test_table (id) VALUES (1);")
if err != nil {
return nil, err
}

err := rows.Scan(&r.Id)
rows, err := conn.Query(context.Background(), "SELECT * FROM replicated_test_table;")
if err != nil {
return err
return nil, err
}

res = append(res, r)
}
return nil
}, backoff.NewExponentialBackOff())
var res []Test
for rows.Next() {
var r Test

return res, err
}
err := rows.Scan(&r.Id)
if err != nil {
return nil, err
}

func performCRUD(conn driver.Conn) ([]Test, error) {
var (
err error
rows []Test
res = append(res, r)
}
return res, nil
},
backoff.NewExponentialBackOff(),
func(err error, duration time.Duration) {
t.Log(err)
},
)
}

err = backoff.Retry(func() error {
err = conn.Exec(context.Background(), "create table if not exists test_table (id UInt64) engine = MergeTree PRIMARY KEY (id) ORDER BY (id) SETTINGS index_granularity = 8192;")
if err != nil {
return err
}

err = conn.Exec(context.Background(), "INSERT INTO test_table (id) VALUES (1);")
if err != nil {
return err
}

rows, err = getAllRows(conn)
if err != nil {
return err
}
func performCRUD(t *testing.T, conn driver.Conn) ([]Test, error) {
return backoff.RetryNotifyWithData(
func() ([]Test, error) {
err := conn.Exec(context.Background(), "create table if not exists test_table (id UInt64) engine = MergeTree PRIMARY KEY (id) ORDER BY (id) SETTINGS index_granularity = 8192;")
if err != nil {
return nil, err
}

return nil
}, backoff.NewExponentialBackOff())
err = conn.Exec(context.Background(), "INSERT INTO test_table (id) VALUES (1);")
if err != nil {
return nil, err
}

return rows, err
return getAllRows(conn)
},
backoff.NewExponentialBackOff(),
func(err error, duration time.Duration) {
t.Log(err)
},
)
}

func getAllRows(conn driver.Conn) ([]Test, error) {
Expand Down
Loading

0 comments on commit 9cd7bcb

Please sign in to comment.