Skip to content

Commit

Permalink
Merge pull request #6 from imperiuse/fix_testcontainers
Browse files Browse the repository at this point in the history
fix. testscontainers
  • Loading branch information
imperiuse authored Oct 16, 2022
2 parents 32bf06d + 7168654 commit b716f91
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 249 deletions.
93 changes: 93 additions & 0 deletions testcontainer/environment/containers_configs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package environment

import (
"fmt"

"github.com/imperiuse/golib/testcontainer"
"github.com/testcontainers/testcontainers-go/wait"
)

const (
NetworkName = "test_id_provisioning_network"

AppName = "app"

KafkaImage = "confluentinc/cp-kafka:7.2.0"
KafkaBrokerPort = "9092"
KafkaClientPort = "9101"
KafkaContainerName = "broker"

ZookeeperImage = "confluentinc/cp-zookeeper:7.2.0"
ZooKeeperPort = "2181"
ZooKeeperContainerName = "zookeeper"
ZooTickTime = "2000"

PostgresImage = "postgres:14"
PostgresPort = "5432"
PostgresContainerName = "db"
PostgresUsername = "admin"
PostgresPassword = "password"
PostgresDB = "id-provisioning"
)

var (
zooCfg = testcontainer.ZookeeperConfig{
BaseContainerConfig: testcontainer.BaseContainerConfig{
Image: ZookeeperImage,
Port: ZooKeeperPort,
Name: ZooKeeperContainerName,
ExposedPorts: []string{fmt.Sprintf("0.0.0.0:%[1]s:%[1]s", ZooKeeperPort)},
Envs: map[string]string{
"ZOOKEEPER_SERVER_ID": "1",
"ZOOKEEPER_SERVERS": "zoo1:2888:3888",
"ZOOKEEPER_CLIENT_PORT": ZooKeeperPort,
"ZOOKEEPER_TICK_TIME": ZooTickTime,
},
WaitingForStrategy: wait.ForLog("binding to port 0.0.0.0/0.0.0.0:2181"),
},
}

kafkaCfg = testcontainer.KafkaConfig{
BaseContainerConfig: testcontainer.BaseContainerConfig{
Image: KafkaImage,
Name: KafkaContainerName,
Port: KafkaClientPort,
ExposedPorts: []string{
fmt.Sprintf("0.0.0.0:%[1]s:%[1]s", KafkaClientPort),
fmt.Sprintf("0.0.0.0:%[1]s:%[1]s", KafkaBrokerPort),
},
Envs: map[string]string{
"KAFKA_BROKER_ID": "1",
"KAFKA_ZOOKEEPER_CONNECT": ZooKeeperContainerName + ":" + ZooKeeperPort,
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT",
"KAFKA_ADVERTISED_HOST_NAME": KafkaContainerName,
"KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://" + KafkaContainerName + ":29092,PLAINTEXT_HOST://localhost:" +
KafkaBrokerPort,
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
"KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS": "0",
"KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR": "1",
"KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR": "1",
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": "1",
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1",
"KAFKA_AUTO_CREATE_TOPICS.ENABLE": "true",
},
WaitingForStrategy: wait.ForLog("[KafkaServer id=1] started"),
},
ClientPort: KafkaClientPort,
BrokerPort: KafkaBrokerPort,
}

postgresCfg = testcontainer.PostgresConfig{BaseContainerConfig: testcontainer.BaseContainerConfig{
Name: PostgresContainerName,
Image: PostgresImage,
Port: PostgresPort,
ExposedPorts: []string{fmt.Sprintf("0.0.0.0:%[1]s:%[1]s", PostgresPort)},
Envs: map[string]string{
"POSTGRES_USER": PostgresUsername,
"POSTGRES_PASSWORD": PostgresPassword,
"POSTGRES_DB": PostgresDB,
},
WaitingForStrategy: wait.ForLog("database system is ready to accept connections"),
},
}
)
82 changes: 82 additions & 0 deletions testcontainer/environment/environment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package environment

import (
"context"
"testing"
"time"

"github.com/imperiuse/golib/testcontainer"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)

type ContainersEnvironment struct {
// First - native go-testcontainers way for creating docker containers.
dockerNetwork *testcontainers.DockerNetwork
kafkaContainer *testcontainer.KafkaCluster
postgresContainer testcontainers.Container

// Second - docker-compose way + go-testcontainers for create docker container env.
compose testcontainers.DockerCompose
}

// StartPureDockerEnvironment - create and start docker containers env with first way.
func (c *ContainersEnvironment) StartPureDockerEnvironment(t *testing.T, ctx context.Context) {
t.Log("> From SetupEnvironment")

t.Log("Create docker network")
dn, err := testcontainer.NewDockerNetwork(ctx, NetworkName)
require.Nil(t, err, "error must be nil for NewDockerNetwork")
require.NotNil(t, dn, "docker network must be not nil")
c.dockerNetwork = dn.(*testcontainers.DockerNetwork)

t.Log("Create service deps")
c.kafkaContainer, err = testcontainer.NewKafkaCluster(ctx, kafkaCfg, zooCfg, c.dockerNetwork)
require.Nil(t, err, "error must be nil, when create NewKafkaCluster")
require.NotNil(t, c.kafkaContainer, "kafka cluster must be not nil")

c.postgresContainer, err = testcontainer.NewPostgresContainer(ctx, postgresCfg, c.dockerNetwork)
require.Nil(t, err, "error must be nil, when create NewPostgresContainer")
require.NotNil(t, c.postgresContainer, "postgres container must be not nil")

t.Log("Start deps services containers")

require.Nil(t, c.kafkaContainer.Start(ctx), "kafka cluster must start without errors")

require.Nil(t, c.postgresContainer.Start(ctx), "postgres must start without errors")

const magicTime = time.Second * 3
time.Sleep(magicTime) // time sleep development // todo think how to remove this
}

// FinishedPureDockerEnvironment - finished containers (env) which we created by first way.
func (c *ContainersEnvironment) FinishedPureDockerEnvironment(t *testing.T, ctx context.Context) {
require.Nil(t, testcontainer.TerminateIfNotNil(ctx, c.kafkaContainer), "must not get an error while terminate kafka cluster")
require.Nil(t, testcontainer.TerminateIfNotNil(ctx, c.postgresContainer), "must not get an error while terminate postgres cluster")
require.Nil(t, c.dockerNetwork.Remove(ctx), "must not get an error while remove docker network")
}

// StartDockerComposeEnvironment - create and start docker containers env with second way.
func (c *ContainersEnvironment) StartDockerComposeEnvironment(
t *testing.T,
composeFilePaths []string,
identifier string,
) {
c.compose = testcontainers.NewLocalDockerCompose(composeFilePaths, identifier).
WaitForService(PostgresContainerName, wait.ForLog("database system is ready to accept connections")).
WaitForService(ZooKeeperContainerName, wait.ForLog("binding to port 0.0.0.0/0.0.0.0:"+ZooKeeperPort)).
WaitForService(KafkaContainerName, wait.ForLog("[KafkaServer id=1] started"))

if len(composeFilePaths) > 1 { // this is little tricky hack here. :)
// if we have one docker-compose file for app container, that add wait strategy.
c.compose = c.compose.WaitForService(AppName, wait.ForLog("App starting successfully! Ready for hard work!"))
}

require.Nil(t, c.compose.WithCommand([]string{"up", "--force-recreate", "-d"}).Invoke().Error)
}

// FinishedDockerComposeEnvironment - finished containers (env) which we created by second way.
func (c *ContainersEnvironment) FinishedDockerComposeEnvironment(t *testing.T) {
require.Nil(t, c.compose.Down().Error, "docker compose must down without errors")
}
61 changes: 13 additions & 48 deletions testcontainer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,25 @@ package testcontainer
import (
"context"
"fmt"
"time"

"github.com/testcontainers/testcontainers-go"
)

type (
// KafkaCluster - kafka cluster struct (together kafka container and zookeeper).
KafkaCluster struct {
KafkaContainer testcontainers.Container
ZookeeperContainer testcontainers.Container
KafkaContainer Container
ZookeeperContainer Container

KafkaURI string

kafkaCfg KafkaConfig
zooCfg ZookeeperConfig
}

// KafkaConfig - kafka container config with zookeeper.
KafkaConfig struct {
BaseContainerConfig
ClientPort string
BrokerPort string
BaseContainerConfig
}
)

Expand All @@ -31,21 +30,22 @@ func NewKafkaCluster(
ctx context.Context,
kafkaCfg KafkaConfig,
zooCfg ZookeeperConfig,
dockerNetwork *testcontainers.DockerNetwork,
dockerNetwork *DockerNetwork,
) (*KafkaCluster, error) {
zookeeperContainer, err := NewZookeeperContainer(ctx, zooCfg, dockerNetwork)
if err != nil {
return nil, err
}

kafkaContainer, err := NewKafkaContainer(ctx, kafkaCfg, zooCfg, dockerNetwork)
kafkaContainer, err := NewKafkaContainer(ctx, kafkaCfg, dockerNetwork)
if err != nil {
return nil, err
}

return &KafkaCluster{
ZookeeperContainer: zookeeperContainer,
KafkaContainer: kafkaContainer,
KafkaURI: fmt.Sprintf("%s:%s", kafkaCfg.Name, kafkaCfg.Port),
kafkaCfg: kafkaCfg,
zooCfg: zooCfg,
}, nil
Expand All @@ -54,54 +54,19 @@ func NewKafkaCluster(
// NewKafkaContainer - create new kafka container, but do not start it yet.
func NewKafkaContainer(
ctx context.Context,
kafkaCfg KafkaConfig,
zooCfg ZookeeperConfig,
dockerNetwork *testcontainers.DockerNetwork,
) (testcontainers.Container, error) {
if len(kafkaCfg.ExposedPorts) == 0 {
kafkaCfg.ExposedPorts = []string{kafkaCfg.ClientPort}
}

if len(kafkaCfg.Envs) == 0 {
kafkaCfg.Envs = map[string]string{
"KAFKA_BROKER_ID": "1",
"KAFKA_ZOOKEEPER_CONNECT": "zookeeper:" + zooCfg.Port,
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT",
"KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://" + kafkaCfg.Name + ":29092,PLAINTEXT_HOST://localhost:" +
kafkaCfg.BrokerPort,
"KAFKA_METRIC_REPORTERS": "io.confluent.metrics.reporter.ConfluentMetricsReporter",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
"KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS": "0",
"KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR": "1",
"KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR": "1",
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": "1",
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1",
"KAFKA_JMX_PORT": "9101",
"KAFKA_JMX_HOSTNAME": "localhost",
"KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL": "http://schema-registry:8089",
"CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS": kafkaCfg.Name + ":29092",
"CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS": "1",
"CONFLUENT_METRICS_ENABLE": "true",
"CONFLUENT_SUPPORT_CUSTOMER_ID": "anonymous",
"KAFKA_AUTO_CREATE_TOPICS.ENABLE": "true",
}
}

cfg KafkaConfig,
dockerNetwork *DockerNetwork,
) (Container, error) {
// creates the kafka container, but do not start it yet
return testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: GetBaseContainerRequest(kafkaCfg.BaseContainerConfig, dockerNetwork),
})
return NewGenericContainer(ctx, cfg.BaseContainerConfig, dockerNetwork)
}

// Start - start ZookeeperContainer and KafkaContainer.
func (c *KafkaCluster) Start(ctx context.Context) error {
if err := c.ZookeeperContainer.Start(ctx); err != nil {
return fmt.Errorf("could not start container. err: %w", err)
return fmt.Errorf("could not start Zookeeper container. err: %w", err)
}

const waitZoo = 5 * time.Second
time.Sleep(waitZoo) // time sleep development

return c.KafkaContainer.Start(ctx)
}

Expand Down
10 changes: 4 additions & 6 deletions testcontainer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@ package testcontainer

import (
"context"

"github.com/testcontainers/testcontainers-go"
)

// NewDockerNetwork - create new docker network.
func NewDockerNetwork(ctx context.Context, name string) (testcontainers.Network, error) {
func NewDockerNetwork(ctx context.Context, name string) (Network, error) {
if err := PullDockerImage(ctx, ReaperImage); err != nil {
return nil, err
}

return testcontainers.GenericNetwork(
ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
return GenericNetwork(
ctx, GenericNetworkRequest{
NetworkRequest: NetworkRequest{
Name: name,
ReaperImage: ReaperImage,
SkipReaper: IsSkipReaperImage,
Expand Down
44 changes: 2 additions & 42 deletions testcontainer/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,20 @@ package testcontainer

import (
"context"
"fmt"

"github.com/docker/go-connections/nat"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)

// NginxConfig - nginx container config.
type NginxConfig struct {
BaseContainerConfig
}

type nginxContainer struct {
testcontainers.Container
URI string
}

// NewNginxContainer - create nginx container, but do not start it yet.
func NewNginxContainer(
ctx context.Context,
cfg NginxConfig,
dockerNetwork *testcontainers.DockerNetwork,
runContainer bool,
) (*nginxContainer, error) {
cfg.ExposedPorts = []string{cfg.Port + "/tcp"}

cr := GetBaseContainerRequest(cfg.BaseContainerConfig, dockerNetwork)

cr.WaitingFor = wait.ForHTTP("/")

container, err := testcontainers.GenericContainer(ctx,
testcontainers.GenericContainerRequest{
ContainerRequest: cr,
Started: runContainer,
},
)
if err != nil {
return nil, err
}

ip, err := container.Host(ctx)
if err != nil {
return nil, err
}

mappedPort, err := container.MappedPort(ctx, nat.Port(cfg.Port))
if err != nil {
return nil, err
}

uri := fmt.Sprintf("http://%s:%s", ip, mappedPort.Port())

return &nginxContainer{
Container: container,
URI: uri,
}, nil
) (Container, error) {
return NewGenericContainer(ctx, cfg.BaseContainerConfig, dockerNetwork)
}
Loading

0 comments on commit b716f91

Please sign in to comment.