Skip to content

Commit

Permalink
Almost fully automated ElToroRojo (no setup.ps1 required).
Browse files Browse the repository at this point in the history
  • Loading branch information
Kamil Stadryniak committed Dec 6, 2022
1 parent 544325c commit aa0c22e
Show file tree
Hide file tree
Showing 21 changed files with 241 additions and 104 deletions.
12 changes: 0 additions & 12 deletions ElToroRojo/Incoming.Api/Properties/launchSettings.json

This file was deleted.

15 changes: 13 additions & 2 deletions ElToroRojo/Incoming.Watchdog/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ LABEL apptype=consoleapp
WORKDIR /source

# copy csproj and restore as distinct layers
COPY *.csproj .
COPY Incoming.Watchdog.csproj .
RUN dotnet restore -r linux-musl-x64 /p:PublishReadyToRun=true

# copy and publish app and libraries
COPY . .
COPY Incoming.Watchdog.csproj Program.cs ./
RUN dotnet publish -c Release -o /app -r linux-musl-x64 --self-contained true --no-restore /p:PublishTrimmed=true /p:PublishReadyToRun=true /p:PublishSingleFile=true

# final stage/image
Expand All @@ -26,6 +26,17 @@ LABEL apptype=consoleapp
WORKDIR /app
COPY --from=build /app .

WORKDIR /builds/Incoming.Producer
COPY Incoming.Producer .

WORKDIR /builds/Incoming.Consumer
COPY Incoming.Consumer .

WORKDIR /builds/Incoming.Api
COPY Incoming.Api .

WORKDIR /app

RUN apk add docker openrc
ENTRYPOINT ["./Incoming.Watchdog"]

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ FROM mcr.microsoft.com/dotnet/runtime-deps:6.0.11-alpine3.16-amd64

LABEL purpose=api
LABEL case=queue
LABEL apptype=consoleapp
LABEL apptype=api

WORKDIR /app
COPY --from=build /app .
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@

using var queueConnection = connectionFactory.CreateConnection();
using var channel = queueConnection.CreateModel();
Console.WriteLine($"Connected to route: test-key. {DateTime.Now.ToLongTimeString()}");
Console.WriteLine($"Connected to incoming.queue. {DateTime.Now.ToLongTimeString()}");

channel.BasicQos(0, 1, false);
var eventsConsumer = new AsyncEventingBasicConsumer(channel);
channel.BasicConsume("test-queue", false, eventsConsumer);
Console.WriteLine($"Subscribed to queue: test-queue. {DateTime.Now.ToLongTimeString()}");
channel.BasicConsume($"{fileName}-queue", false, eventsConsumer);
Console.WriteLine($"Subscribed to queue: {fileName}-queue. {DateTime.Now.ToLongTimeString()}");

var currentlyProcessing = false;

Expand All @@ -63,7 +63,7 @@
while (!cancellationTokenSource.IsCancellationRequested || currentlyProcessing)
Thread.Sleep(TimeSpan.FromSeconds(1));

Console.WriteLine($"Closing connection to route: test-key. {DateTime.Now.ToLongTimeString()}");
Console.WriteLine($"Closing connection to incoming.queue. {DateTime.Now.ToLongTimeString()}");

channel.Close();
queueConnection.Close();
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
if (File.Exists(filePath))
{
var rows = File.ReadAllLines(filePath);
var fileName = Path.GetFileName(filePath);

var connectionFactory = new ConnectionFactory
{
Expand All @@ -22,15 +23,15 @@

using var connection = connectionFactory.CreateConnection();
using var channel = connection.CreateModel();
Console.WriteLine($"Connected to route: test-key. {DateTime.Now.ToLongTimeString()}");
Console.WriteLine($"Connected to route: {fileName}-key. {DateTime.Now.ToLongTimeString()}");

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

foreach (var row in rows)
{
channel.BasicPublish("test-exchange", "test-key", true, properties, Encoding.UTF8.GetBytes(row));
Console.WriteLine($"Sending message: {row}, from file: {filePath}, to route: test-key. {DateTime.Now.ToLongTimeString()}");
channel.BasicPublish($"{fileName}-exchange", $"{fileName}-key", true, properties, Encoding.UTF8.GetBytes(row));
Console.WriteLine($"Sending message: {row}, from file: {filePath}, to route: {fileName}-key. {DateTime.Now.ToLongTimeString()}");
}

File.Delete(filePath);
Expand Down
1 change: 1 addition & 0 deletions ElToroRojo/Incoming.Watchdog/Incoming.Watchdog.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

<ItemGroup>
<PackageReference Include="Docker.DotNet" Version="3.125.12" />
<PackageReference Include="SharpZipLib" Version="1.4.1" />
</ItemGroup>

</Project>
182 changes: 156 additions & 26 deletions ElToroRojo/Incoming.Watchdog/Program.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
using System.ComponentModel;
using System.Data;
using System.Text;
using Docker.DotNet;
using Docker.DotNet.Models;
using ICSharpCode.SharpZipLib.Tar;

const string workspaceDir = "/workspace";

Console.WriteLine($"Watchdog started. {DateTime.Now.ToShortTimeString()}");

Console.WriteLine($"Watchdog started. {DateTime.Now.ToLongTimeString()}");
while (true)
{
var files = Directory.GetFiles(workspaceDir);
var fileNames = Directory
.GetFiles(workspaceDir)
.Select(Path.GetFileName)
.Cast<string>()
.ToList();

if (files.Any())
if (fileNames.Any())
{
Console.WriteLine($"Beginning of processing {files.Length} file(s). {DateTime.Now.ToShortTimeString()}");
Console.WriteLine($"Beginning of processing {fileNames.Count} file(s). {DateTime.Now.ToLongTimeString()}");

var dockerEngineUrl = new Uri("unix://var/run/docker.sock");
using var client = new DockerClientConfiguration(dockerEngineUrl).CreateClient();

await CreateQueue(client, fileNames);

await CreateQueue(client);

foreach (var filePath in files)
foreach (var fileName in fileNames)
{
var fileName = Path.GetFileName(filePath);

await CreateApi(client, fileName);
await CreateConsumer(client, fileName);
await CreateProducer(client, fileName, filePath);
await CreateProducer(client, fileName);
}

await Cleanup(client, "api", new[] { "consumer" });
Expand All @@ -36,12 +39,16 @@
Thread.Sleep(TimeSpan.FromSeconds(1));
}

async Task CreateQueue(IDockerClient client)
async Task CreateQueue(IDockerClient client, ICollection<string> fileNames)
{
const string imageName = "rabbitmq:3.11-management";

await PullImage(client, imageName);

var parameters = new CreateContainerParameters
{
Name = "incoming.queue",
Image = "rabbitmq:3.11-management",
Image = imageName,
Hostname = "incoming.queue",
ExposedPorts = new Dictionary<string, EmptyStruct>
{
Expand All @@ -66,22 +73,25 @@ async Task CreateQueue(IDockerClient client)

var containerId = await CreateContainer(client, parameters, "Queue");

await SetupRabbitMq(client, containerId);
await SetupRabbitMq(client, containerId, fileNames);
}

async Task SetupRabbitMq(IDockerClient client, string containerId)
async Task SetupRabbitMq(IDockerClient client, string containerId, ICollection<string> fileNames)
{
await ExecCommandInsideContainer(client, containerId, "rabbitmqadmin list exchanges", 10);
Console.WriteLine($"RabbitMq is ready to use. {DateTime.Now.ToLongTimeString()}");

await ExecCommandInsideContainer(client, containerId, "rabbitmqadmin declare exchange name=test-exchange type=direct");
Console.WriteLine($"Declared exchange inside queue container with id: {containerId} {DateTime.Now.ToLongTimeString()}");
foreach (var fileName in fileNames)
{
await ExecCommandInsideContainer(client, containerId, $"rabbitmqadmin declare exchange name={fileName}-exchange type=direct");
Console.WriteLine($"Declared exchange '{fileName}-exchange' inside queue container with id: {containerId} {DateTime.Now.ToLongTimeString()}");

await ExecCommandInsideContainer(client, containerId, "rabbitmqadmin declare queue name=test-queue durable=false");
Console.WriteLine($"Declared queue inside queue container with id: {containerId} {DateTime.Now.ToLongTimeString()}");
await ExecCommandInsideContainer(client, containerId, $"rabbitmqadmin declare queue name={fileName}-queue durable=false");
Console.WriteLine($"Declared queue '{fileName}-queue' inside queue container with id: {containerId} {DateTime.Now.ToLongTimeString()}");

await ExecCommandInsideContainer(client, containerId, "rabbitmqadmin declare binding source=test-exchange destination_type=queue destination=test-queue routing_key=test-key");
Console.WriteLine($"Declared binding inside queue container with id: {containerId} {DateTime.Now.ToLongTimeString()}");
await ExecCommandInsideContainer(client, containerId, $"rabbitmqadmin declare binding source={fileName}-exchange destination_type=queue destination={fileName}-queue routing_key={fileName}-key");
Console.WriteLine($"Declared binding from: '{fileName}-exchange' to: '{fileName}-queue' with routing key:'{fileName}-key' inside queue container with id: {containerId} {DateTime.Now.ToLongTimeString()}");
}

await ExecCommandInsideContainer(client, containerId, "rabbitmqctl add_user admin admin");
Console.WriteLine($"Created user inside queue container with id: {containerId} {DateTime.Now.ToLongTimeString()}");
Expand Down Expand Up @@ -124,10 +134,14 @@ await execStream

async Task CreateApi(IDockerClient client, string fileName)
{
const string imageName = "incoming.api";
const string dockerFilePath = "/builds/Incoming.Api";
await BuildImage(client, imageName, dockerFilePath);

var parameters = new CreateContainerParameters
{
Name = $"incoming.api-{fileName}",
Image = "incoming.api:latest",
Image = imageName,
Hostname = $"incoming.api-{fileName}",
ExposedPorts = new Dictionary<string, EmptyStruct>
{
Expand All @@ -151,10 +165,14 @@ async Task CreateApi(IDockerClient client, string fileName)

async Task CreateConsumer(IDockerClient client, string fileName)
{
const string imageName = "incoming.consumer";
const string dockerFilePath = "/builds/Incoming.Consumer";
await BuildImage(client, imageName, dockerFilePath);

var parameters = new CreateContainerParameters
{
Name = $"incoming.consumer-{fileName}",
Image = "incoming.consumer:latest",
Image = imageName,
Hostname = $"incoming.consumer-{fileName}",
HostConfig = new HostConfig
{
Expand All @@ -173,12 +191,16 @@ async Task CreateConsumer(IDockerClient client, string fileName)
await ConnectToNetwork(client, containerId, "eltororojo_errordb-network");
}

async Task CreateProducer(IDockerClient client, string fileName, string filePath)
async Task CreateProducer(IDockerClient client, string fileName)
{
const string imageName = "incoming.producer";
const string dockerFilePath = "/builds/Incoming.Producer";
await BuildImage(client, imageName, dockerFilePath);

var parameters = new CreateContainerParameters
{
Name = $"incoming.producer-{fileName}",
Image = "incoming.producer:latest",
Image = imageName,
Hostname = $"incoming.producer-{fileName}",
HostConfig = new HostConfig
{
Expand All @@ -191,7 +213,7 @@ async Task CreateProducer(IDockerClient client, string fileName, string filePath
},
Env = new List<string>
{
$"FILE_PATH={filePath}"
$"FILE_PATH={Path.Combine(workspaceDir, fileName)}"
}
};

Expand Down Expand Up @@ -253,4 +275,112 @@ async Task Cleanup(IDockerClient client, string targetPurpose, ICollection<strin
await client.Containers.StopContainerAsync(targetContainerId, stopParameters);
Console.WriteLine($"Stopped (and automatically removed) {targetPurpose} container: {targetContainerId}. {DateTime.Now.ToLongTimeString()}");
}
}

async Task PullImage(IDockerClient client, string imageName)
{
var imageAlreadyExists = await ImageAlreadyExists(client, imageName);
if (!imageAlreadyExists)
{
Console.WriteLine($"Image named '{imageName}' not found. Pulling image. {DateTime.Now.ToLongTimeString()}");

var imagesCreateParameters = new ImagesCreateParameters
{
FromImage = imageName
};

var progress = new Progress<JSONMessage>();
progress.ProgressChanged += (sender, e) =>
{
Console.WriteLine($"From: {e.From}");
Console.WriteLine($"Status: {e.Status}");
Console.WriteLine($"Stream: {e.Stream}");
Console.WriteLine($"ID: {e.ID}");
Console.WriteLine($"Progress: {e.ProgressMessage}");
Console.WriteLine($"Error: {e.ErrorMessage}");
};

await client.Images.CreateImageAsync(imagesCreateParameters, new AuthConfig(), progress);

Console.WriteLine($"Image named '{imageName}' pulled. {DateTime.Now.ToLongTimeString()}");
}
}

async Task BuildImage(IDockerClient client, string imageName, string dockerFilePath, bool rebuild = false)
{
if (!rebuild)
rebuild = !await ImageAlreadyExists(client, imageName); //If should not rebuild existing image, check if it exists. If yes then do not build.

if (rebuild)
{
Console.WriteLine($"Image named '{imageName}' not found. Building image from {dockerFilePath}. {DateTime.Now.ToLongTimeString()}");

var imageBuildParameters = new ImageBuildParameters
{
Dockerfile = dockerFilePath
};

var progress = new Progress<JSONMessage>();
progress.ProgressChanged += (sender, e) =>
{
Console.WriteLine($"From: {e.From}");
Console.WriteLine($"Status: {e.Status}");
Console.WriteLine($"Stream: {e.Stream}");
Console.WriteLine($"ID: {e.ID}");
Console.WriteLine($"Progress: {e.ProgressMessage}");
Console.WriteLine($"Error: {e.ErrorMessage}");
};

var stream = CreateTarFileForDockerfileDirectory(dockerFilePath);

await client.Images.BuildImageFromDockerfileAsync(imageBuildParameters, stream, new AuthConfig[]{}, new Dictionary<string, string>(), progress);

Console.WriteLine($"Image named '{imageName}' built. {DateTime.Now.ToLongTimeString()}");
}
}

async Task<bool> ImageAlreadyExists(IDockerClient client, string imageName)
{
var listImagesParameters = new ImagesListParameters { All = true };
var images = await client.Images.ListImagesAsync(listImagesParameters);
return images.Any(image => image.RepoTags.Contains(imageName));
}

Stream CreateTarFileForDockerfileDirectory(string directory)
{
var stream = new MemoryStream();
var filePaths = Directory.GetFiles(directory, "*.*", SearchOption.AllDirectories);

using var archive = new TarOutputStream(stream, Encoding.UTF8);

//Prevent the TarOutputStream from closing the underlying memory stream when done
archive.IsStreamOwner = false;

//Add files to tar archive
foreach (var file in filePaths)
{
var tarName = Path.GetFileNameWithoutExtension(file);

var entry = TarEntry.CreateTarEntry(tarName);
using var fileStream = File.OpenRead(file);
entry.Size = fileStream.Length;
archive.PutNextEntry(entry);

var localBuffer = new byte[32 * 1024];

while (true)
{
var numberOfBytesSavedToBuffer = fileStream.Read(localBuffer, 0, localBuffer.Length);
if (numberOfBytesSavedToBuffer <= 0)
break;

archive.Write(localBuffer, 0, numberOfBytesSavedToBuffer);
}
archive.CloseEntry();
}

archive.Close();

stream.Position = 0;
return stream;
}
Loading

0 comments on commit aa0c22e

Please sign in to comment.