Skip to content

Commit

Permalink
Consolidate Kafka resources. (#2129)
Browse files Browse the repository at this point in the history
* Consolidate Kafka resources.

* Update src/Aspire.Hosting/Kafka/KafkaBuilderExtensions.cs

Co-authored-by: James Newton-King <[email protected]>

* Fix up fat finger.

---------

Co-authored-by: James Newton-King <[email protected]>
  • Loading branch information
mitchdenny and JamesNK authored Feb 8, 2024
1 parent 4024549 commit 90caa5a
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 59 deletions.
33 changes: 14 additions & 19 deletions src/Aspire.Hosting/Kafka/KafkaBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,35 @@ namespace Aspire.Hosting;
public static class KafkaBuilderExtensions
{
private const int KafkaBrokerPort = 9092;

/// <summary>
/// Adds a Kafka broker container to the application.
/// Changes the Kafka resource to be published as a container in the manifest.
/// </summary>
/// <param name="builder">The <see cref="IDistributedApplicationBuilder"/>.</param>
/// <param name="name">The name of the resource. This name will be used as the connection string name when referenced in a dependency.</param>
/// <param name="port">The host port of Kafka broker.</param>
/// <returns>A reference to the <see cref="IResourceBuilder{KafkaContainerResource}"/></returns>
public static IResourceBuilder<KafkaContainerResource> AddKafkaContainer(this IDistributedApplicationBuilder builder, string name, int? port = null)
/// <param name="builder">Resource builder for <see cref="KafkaServerResource"/>.</param>
/// <returns>A reference to the <see cref="IResourceBuilder{T}"/>.</returns>
public static IResourceBuilder<KafkaServerResource> PublishAsContainer(this IResourceBuilder<KafkaServerResource> builder)
{
var kafka = new KafkaContainerResource(name);
return builder.AddResource(kafka)
.WithEndpoint(hostPort: port, containerPort: KafkaBrokerPort)
.WithAnnotation(new ContainerImageAnnotation { Image = "confluentinc/confluent-local", Tag = "latest" })
.WithManifestPublishingCallback(context => WriteKafkaContainerToManifest(context, kafka))
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka));
return builder.WithManifestPublishingCallback(context => WriteKafkaContainerToManifest(context, builder.Resource));
}

static void WriteKafkaContainerToManifest(ManifestPublishingContext context, KafkaContainerResource resource)
{
context.WriteContainer(resource);
context.Writer.WriteString("connectionString", $"{{{resource.Name}.bindings.tcp.host}}:{{{resource.Name}.bindings.tcp.port}}");
}
private static void WriteKafkaContainerToManifest(ManifestPublishingContext context, KafkaServerResource resource)
{
context.WriteContainer(resource);
context.Writer.WriteString("connectionString", $"{{{resource.Name}.bindings.tcp.host}}:{{{resource.Name}.bindings.tcp.port}}");
}

/// <summary>
/// Adds a Kafka resource to the application. A container is used for local development.
/// </summary>
/// <param name="builder">The <see cref="IDistributedApplicationBuilder"/>.</param>
/// <param name="name">The name of the resource. This name will be used as the connection string name when referenced in a dependency</param>
/// <param name="port">The host port of Kafka broker.</param>
/// <returns>A reference to the <see cref="IResourceBuilder{KafkaServerResource}"/>.</returns>
public static IResourceBuilder<KafkaServerResource> AddKafka(this IDistributedApplicationBuilder builder, string name)
public static IResourceBuilder<KafkaServerResource> AddKafka(this IDistributedApplicationBuilder builder, string name, int? port = null)
{
var kafka = new KafkaServerResource(name);
return builder.AddResource(kafka)
.WithEndpoint(containerPort: KafkaBrokerPort)
.WithEndpoint(containerPort: KafkaBrokerPort, hostPort: port)
.WithAnnotation(new ContainerImageAnnotation{ Image = "confluentinc/confluent-local", Tag = "latest" })
.WithManifestPublishingCallback(WriteKafkaServerToManifest)
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka));
Expand Down
37 changes: 0 additions & 37 deletions src/Aspire.Hosting/Kafka/KafkaContainerResource.cs

This file was deleted.

6 changes: 4 additions & 2 deletions src/Aspire.Hosting/Kafka/KafkaServerResource.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.ApplicationModel;

namespace Aspire.Hosting;

/// <summary>
/// A resource that represents a Kafka broker.
/// </summary>
/// <param name="name">The name of the resource.</param>
public class KafkaServerResource(string name) : Resource(name), IResourceWithConnectionString, IResourceWithEnvironment
public class KafkaServerResource(string name) : ContainerResource(name), IResourceWithConnectionString, IResourceWithEnvironment
{
/// <summary>
/// Gets the connection string for Kafka broker.
Expand Down
2 changes: 1 addition & 1 deletion tests/Aspire.Hosting.Tests/ManifestGenerationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public void EnsureAllKafkaManifestTypesHaveVersion0Suffix()
var program = CreateTestProgramJsonDocumentManifestPublisher();

program.AppBuilder.AddKafka("kafkaabstract");
program.AppBuilder.AddKafkaContainer("kafkacontainer");
program.AppBuilder.AddKafka("kafkacontainer").PublishAsContainer();

// Build AppHost so that publisher can be resolved.
program.Build();
Expand Down

0 comments on commit 90caa5a

Please sign in to comment.