Skip to content

Commit

Permalink
feat: Replace Kafka module (#802)
Browse files Browse the repository at this point in the history
  • Loading branch information
HofmeisterAn committed Feb 27, 2023
1 parent 8a6daad commit 69453ec
Show file tree
Hide file tree
Showing 24 changed files with 322 additions and 348 deletions.
1 change: 0 additions & 1 deletion Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
<PackageReference Update="Azure.Data.Tables" Version="12.6.1" />
<PackageReference Update="Azure.Storage.Blobs" Version="12.13.0" />
<PackageReference Update="Azure.Storage.Queues" Version="12.11.0" />
<PackageReference Update="Confluent.Kafka" Version="1.8.2" />
<PackageReference Update="Microsoft.Azure.Cosmos" Version="3.30.0" />
</ItemGroup>
</Project>
14 changes: 14 additions & 0 deletions Testcontainers.sln
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.Elasticsearc
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.EventStoreDb", "src\Testcontainers.EventStoreDb\Testcontainers.EventStoreDb.csproj", "{84D707E0-C9FA-4327-85DC-0AFEBEA73572}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.Kafka", "src\Testcontainers.Kafka\Testcontainers.Kafka.csproj", "{E93E40CE-59AA-4561-9B4C-E7B0A686326E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.LocalStack", "src\Testcontainers.LocalStack\Testcontainers.LocalStack.csproj", "{3792268A-EF08-4569-8118-991E08FD61C4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.MariaDb", "src\Testcontainers.MariaDb\Testcontainers.MariaDb.csproj", "{4B204EB3-C478-422E-9B6F-62DF3871291A}"
Expand Down Expand Up @@ -63,6 +65,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.Elasticsearc
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.EventStoreDb.Tests", "tests\Testcontainers.EventStoreDb.Tests\Testcontainers.EventStoreDb.Tests.csproj", "{64F8E9B9-78FD-4E13-BDDF-0340E2D4E1D0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.Kafka.Tests", "tests\Testcontainers.Kafka.Tests\Testcontainers.Kafka.Tests.csproj", "{6F2AEE03-629A-4B49-BD5B-25CA3C61FFB7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.LocalStack.Tests", "tests\Testcontainers.LocalStack.Tests\Testcontainers.LocalStack.Tests.csproj", "{728CBE16-1D52-4F84-AF01-7229E6013512}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.MariaDb.Tests", "tests\Testcontainers.MariaDb.Tests\Testcontainers.MariaDb.Tests.csproj", "{7F0AE083-9DB8-4BD4-91F7-C199DCC7301D}"
Expand Down Expand Up @@ -134,6 +138,10 @@ Global
{84D707E0-C9FA-4327-85DC-0AFEBEA73572}.Debug|Any CPU.Build.0 = Debug|Any CPU
{84D707E0-C9FA-4327-85DC-0AFEBEA73572}.Release|Any CPU.ActiveCfg = Release|Any CPU
{84D707E0-C9FA-4327-85DC-0AFEBEA73572}.Release|Any CPU.Build.0 = Release|Any CPU
{E93E40CE-59AA-4561-9B4C-E7B0A686326E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E93E40CE-59AA-4561-9B4C-E7B0A686326E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E93E40CE-59AA-4561-9B4C-E7B0A686326E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E93E40CE-59AA-4561-9B4C-E7B0A686326E}.Release|Any CPU.Build.0 = Release|Any CPU
{3792268A-EF08-4569-8118-991E08FD61C4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3792268A-EF08-4569-8118-991E08FD61C4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3792268A-EF08-4569-8118-991E08FD61C4}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -214,6 +222,10 @@ Global
{64F8E9B9-78FD-4E13-BDDF-0340E2D4E1D0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64F8E9B9-78FD-4E13-BDDF-0340E2D4E1D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64F8E9B9-78FD-4E13-BDDF-0340E2D4E1D0}.Release|Any CPU.Build.0 = Release|Any CPU
{6F2AEE03-629A-4B49-BD5B-25CA3C61FFB7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6F2AEE03-629A-4B49-BD5B-25CA3C61FFB7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6F2AEE03-629A-4B49-BD5B-25CA3C61FFB7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6F2AEE03-629A-4B49-BD5B-25CA3C61FFB7}.Release|Any CPU.Build.0 = Release|Any CPU
{728CBE16-1D52-4F84-AF01-7229E6013512}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{728CBE16-1D52-4F84-AF01-7229E6013512}.Debug|Any CPU.Build.0 = Debug|Any CPU
{728CBE16-1D52-4F84-AF01-7229E6013512}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -305,6 +317,7 @@ Global
{2EAFA567-9F68-4C52-9DBC-8F3EC11BB2CE} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
{641DDEA5-B6E0-41E6-BA11-7A28C0913127} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
{84D707E0-C9FA-4327-85DC-0AFEBEA73572} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
{E93E40CE-59AA-4561-9B4C-E7B0A686326E} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
{3792268A-EF08-4569-8118-991E08FD61C4} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
{4B204EB3-C478-422E-9B6F-62DF3871291A} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
{1266E1E6-5CEF-4161-8B45-83282455746E} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
Expand All @@ -325,6 +338,7 @@ Global
{101515E6-74C1-40F9-85C8-871F742A378D} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
{DD5B3678-468F-4D73-AECE-705E3D66CD43} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
{64F8E9B9-78FD-4E13-BDDF-0340E2D4E1D0} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
{6F2AEE03-629A-4B49-BD5B-25CA3C61FFB7} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
{728CBE16-1D52-4F84-AF01-7229E6013512} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
{7F0AE083-9DB8-4BD4-91F7-C199DCC7301D} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
{5DB1F35F-B714-4B62-84BE-16A33084D3E1} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
Expand Down
1 change: 1 addition & 0 deletions src/Testcontainers.Kafka/.editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
root = true
108 changes: 108 additions & 0 deletions src/Testcontainers.Kafka/KafkaBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
namespace Testcontainers.Kafka;

/// <inheritdoc cref="ContainerBuilder{TBuilderEntity, TContainerEntity, TConfigurationEntity}" />
[PublicAPI]
public sealed class KafkaBuilder : ContainerBuilder<KafkaBuilder, KafkaContainer, KafkaConfiguration>
{
public const string KafkaImage = "confluentinc/cp-kafka:6.1.9";

public const ushort KafkaPort = 9092;

public const ushort BrokerPort = 9093;

public const ushort ZookeeperPort = 2181;

public const string StartupScriptFilePath = "/testcontainers.sh";

/// <summary>
/// Initializes a new instance of the <see cref="KafkaBuilder" /> class.
/// </summary>
public KafkaBuilder()
: this(new KafkaConfiguration())
{
DockerResourceConfiguration = Init().DockerResourceConfiguration;
}

/// <summary>
/// Initializes a new instance of the <see cref="KafkaBuilder" /> class.
/// </summary>
/// <param name="resourceConfiguration">The Docker resource configuration.</param>
private KafkaBuilder(KafkaConfiguration resourceConfiguration)
: base(resourceConfiguration)
{
DockerResourceConfiguration = resourceConfiguration;
}

/// <inheritdoc />
protected override KafkaConfiguration DockerResourceConfiguration { get; }

/// <inheritdoc />
public override KafkaContainer Build()
{
Validate();
return new KafkaContainer(DockerResourceConfiguration, TestcontainersSettings.Logger);
}

/// <inheritdoc />
protected override KafkaBuilder Init()
{
return base.Init()
.WithImage(KafkaImage)
.WithPortBinding(KafkaPort, true)
.WithPortBinding(BrokerPort, true)
.WithPortBinding(ZookeeperPort, true)
.WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KafkaPort + ",BROKER://0.0.0.0:" + BrokerPort)
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
.WithEnvironment("KAFKA_BROKER_ID", "1")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString())
.WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
.WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZookeeperPort)
.WithEntrypoint("/bin/sh", "-c")
.WithCommand("while [ ! -f " + StartupScriptFilePath + " ]; do sleep 0.1; done; " + StartupScriptFilePath)
.WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("\\[KafkaServer id=\\d+\\] started"))
.WithStartupCallback((container, ct) =>
{
const char lf = '\n';
var startupScript = new StringBuilder();
startupScript.Append("#!/bin/bash");
startupScript.Append(lf);
startupScript.Append("echo 'clientPort=" + ZookeeperPort + "' > zookeeper.properties");
startupScript.Append(lf);
startupScript.Append("echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties");
startupScript.Append(lf);
startupScript.Append("echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties");
startupScript.Append(lf);
startupScript.Append("zookeeper-server-start zookeeper.properties &");
startupScript.Append(lf);
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.Hostname + ":" + BrokerPort);
startupScript.Append(lf);
startupScript.Append("echo '' > /etc/confluent/docker/ensure");
startupScript.Append(lf);
startupScript.Append("/etc/confluent/docker/run");
return container.CopyFileAsync(StartupScriptFilePath, Encoding.Default.GetBytes(startupScript.ToString()), 493, ct: ct);
});
}

/// <inheritdoc />
protected override KafkaBuilder Clone(IResourceConfiguration<CreateContainerParameters> resourceConfiguration)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
}

/// <inheritdoc />
protected override KafkaBuilder Clone(IContainerConfiguration resourceConfiguration)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
}

/// <inheritdoc />
protected override KafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfiguration newValue)
{
return new KafkaBuilder(new KafkaConfiguration(oldValue, newValue));
}
}
53 changes: 53 additions & 0 deletions src/Testcontainers.Kafka/KafkaConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
namespace Testcontainers.Kafka;

/// <inheritdoc cref="ContainerConfiguration" />
[PublicAPI]
public sealed class KafkaConfiguration : ContainerConfiguration
{
/// <summary>
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
/// </summary>
public KafkaConfiguration()
{
}

/// <summary>
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
/// </summary>
/// <param name="resourceConfiguration">The Docker resource configuration.</param>
public KafkaConfiguration(IResourceConfiguration<CreateContainerParameters> resourceConfiguration)
: base(resourceConfiguration)
{
// Passes the configuration upwards to the base implementations to create an updated immutable copy.
}

/// <summary>
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
/// </summary>
/// <param name="resourceConfiguration">The Docker resource configuration.</param>
public KafkaConfiguration(IContainerConfiguration resourceConfiguration)
: base(resourceConfiguration)
{
// Passes the configuration upwards to the base implementations to create an updated immutable copy.
}

/// <summary>
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
/// </summary>
/// <param name="resourceConfiguration">The Docker resource configuration.</param>
public KafkaConfiguration(KafkaConfiguration resourceConfiguration)
: this(new KafkaConfiguration(), resourceConfiguration)
{
// Passes the configuration upwards to the base implementations to create an updated immutable copy.
}

/// <summary>
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
/// </summary>
/// <param name="oldValue">The old Docker resource configuration.</param>
/// <param name="newValue">The new Docker resource configuration.</param>
public KafkaConfiguration(KafkaConfiguration oldValue, KafkaConfiguration newValue)
: base(oldValue, newValue)
{
}
}
25 changes: 25 additions & 0 deletions src/Testcontainers.Kafka/KafkaContainer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace Testcontainers.Kafka;

/// <inheritdoc cref="DockerContainer" />
[PublicAPI]
public sealed class KafkaContainer : DockerContainer
{
/// <summary>
/// Initializes a new instance of the <see cref="KafkaContainer" /> class.
/// </summary>
/// <param name="configuration">The container configuration.</param>
/// <param name="logger">The logger.</param>
public KafkaContainer(KafkaConfiguration configuration, ILogger logger)
: base(configuration, logger)
{
}

/// <summary>
/// Gets the broker address.
/// </summary>
/// <returns>The broker address.</returns>
public string GetBootstrapAddress()
{
return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaBuilder.KafkaPort)).ToString();
}
}
12 changes: 12 additions & 0 deletions src/Testcontainers.Kafka/Testcontainers.Kafka.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2022.3.1"/>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="$(SolutionDir)src/Testcontainers/Testcontainers.csproj"/>
</ItemGroup>
</Project>
8 changes: 8 additions & 0 deletions src/Testcontainers.Kafka/Usings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
global using System;
global using System.Text;
global using Docker.DotNet.Models;
global using DotNet.Testcontainers.Builders;
global using DotNet.Testcontainers.Configurations;
global using DotNet.Testcontainers.Containers;
global using JetBrains.Annotations;
global using Microsoft.Extensions.Logging;
24 changes: 14 additions & 10 deletions src/Testcontainers.Redpanda/RedpandaBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ public sealed class RedpandaBuilder : ContainerBuilder<RedpandaBuilder, Redpanda
{
public const string RedpandaImage = "docker.redpanda.com/vectorized/redpanda:v22.2.1";

public const ushort SchemaRegistryPort = 8081;
public const ushort RedpandaPort = 9092;

public const ushort BrokerPort = 9092;
public const ushort SchemaRegistryPort = 8081;

public const string StarterScript = "/testcontainers.sh";
public const string StartupScriptFilePath = "/testcontainers.sh";

/// <summary>
/// Initializes a new instance of the <see cref="RedpandaBuilder" /> class.
Expand Down Expand Up @@ -47,17 +47,21 @@ protected override RedpandaBuilder Init()
return base.Init()
.WithImage(RedpandaImage)
.WithPortBinding(SchemaRegistryPort, true)
.WithPortBinding(BrokerPort, true)
.WithPortBinding(RedpandaPort, true)
.WithEntrypoint("/bin/sh", "-c")
.WithCommand("while [ ! -f " + StarterScript + " ]; do sleep 0.1; done; " + StarterScript)
.WithCommand("while [ ! -f " + StartupScriptFilePath + " ]; do sleep 0.1; done; " + StartupScriptFilePath)
.WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("Started Kafka API server"))
.WithStartupCallback((container, ct) =>
{
var cmd = "#!/bin/bash\n";
cmd += "/usr/bin/rpk redpanda start --mode dev-container ";
cmd += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ";
cmd += "--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + container.Hostname + ":" + container.GetMappedPublicPort(BrokerPort);
return container.CopyFileAsync(StarterScript, Encoding.Default.GetBytes(cmd), 493, ct: ct);
const char lf = '\n';
var startupScript = new StringBuilder();
startupScript.Append("#!/bin/bash");
startupScript.Append(lf);
startupScript.Append("/usr/bin/rpk redpanda start ");
startupScript.Append("--mode dev-container ");
startupScript.Append("--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ");
startupScript.Append("--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + container.Hostname + ":" + container.GetMappedPublicPort(RedpandaPort));
return container.CopyFileAsync(StartupScriptFilePath, Encoding.Default.GetBytes(startupScript.ToString()), 493, ct: ct);
});
}

Expand Down
8 changes: 4 additions & 4 deletions src/Testcontainers.Redpanda/RedpandaContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ public string GetSchemaRegistryAddress()
}

/// <summary>
/// Gets the broker address.
/// Gets the bootstrap address.
/// </summary>
/// <returns>The broker address.</returns>
public string GetBrokerAddress()
/// <returns>The bootstrap address.</returns>
public string GetBootstrapAddress()
{
return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(RedpandaBuilder.BrokerPort)).ToString();
return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(RedpandaBuilder.RedpandaPort)).ToString();
}
}

This file was deleted.

Loading

0 comments on commit 69453ec

Please sign in to comment.