From 650c6f029199ddd4e250c55dcd2ffe9386a78d5b Mon Sep 17 00:00:00 2001 From: gokerakc Date: Mon, 12 Feb 2024 13:09:14 +0000 Subject: [PATCH] feat: added new topic configuration properties --- .../Kafka/KafkaChannelBinding.cs | 4 ++++ .../Kafka/TopicConfigurationObject.cs | 24 +++++++++++++++++++ src/LEGO.AsyncAPI/Models/AsyncApiConstants.cs | 4 ++++ .../Bindings/Kafka/KafkaBindings_Should.cs | 10 +++++++- 4 files changed, 41 insertions(+), 1 deletion(-) diff --git a/src/LEGO.AsyncAPI.Bindings/Kafka/KafkaChannelBinding.cs b/src/LEGO.AsyncAPI.Bindings/Kafka/KafkaChannelBinding.cs index 48fa5f04..d8c43556 100644 --- a/src/LEGO.AsyncAPI.Bindings/Kafka/KafkaChannelBinding.cs +++ b/src/LEGO.AsyncAPI.Bindings/Kafka/KafkaChannelBinding.cs @@ -51,6 +51,10 @@ public class KafkaChannelBinding : ChannelBinding { "retention.bytes", (a, n) => { a.RetentionBytes = n.GetIntegerValue(); } }, { "delete.retention.ms", (a, n) => { a.DeleteRetentionMiliseconds = n.GetIntegerValue(); } }, { "max.message.bytes", (a, n) => { a.MaxMessageBytes = n.GetIntegerValue(); } }, + { "confluent.key.schema.validation", (a, n) => { a.ConfluentKeySchemaValidation = n.GetBooleanValue(); } }, + { "confluent.key.subject.name.strategy", (a, n) => { a.ConfluentKeySubjectName = n.GetScalarValue(); } }, + { "confluent.value.schema.validation", (a, n) => { a.ConfluentValueSchemaValidation = n.GetBooleanValue(); } }, + { "confluent.value.subject.name.strategy", (a, n) => { a.ConfluentValueSubjectName = n.GetScalarValue(); } }, }; /// diff --git a/src/LEGO.AsyncAPI.Bindings/Kafka/TopicConfigurationObject.cs b/src/LEGO.AsyncAPI.Bindings/Kafka/TopicConfigurationObject.cs index da6233c3..2cbf6b4c 100644 --- a/src/LEGO.AsyncAPI.Bindings/Kafka/TopicConfigurationObject.cs +++ b/src/LEGO.AsyncAPI.Bindings/Kafka/TopicConfigurationObject.cs @@ -33,6 +33,26 @@ public class TopicConfigurationObject : IAsyncApiElement /// The max.message.bytes configuration option. /// public int? MaxMessageBytes { get; set; } + + /// + /// The confluent.key.schema.validation configuration option. + /// + public bool? ConfluentKeySchemaValidation { get; set; } + + /// + /// The confluent.key.subject.name.strategy configuration option. + /// + public string ConfluentKeySubjectName { get; set; } + + /// + /// The confluent.value.schema.validation configuration option. + /// + public bool? ConfluentValueSchemaValidation { get; set; } + + /// + /// The confluent.value.subject.name.strategy configuration option. + /// + public string ConfluentValueSubjectName { get; set; } public void Serialize(IAsyncApiWriter writer) { @@ -47,6 +67,10 @@ public void Serialize(IAsyncApiWriter writer) writer.WriteOptionalProperty(AsyncApiConstants.RetentionBytes, this.RetentionBytes); writer.WriteOptionalProperty(AsyncApiConstants.DeleteRetentionMiliseconds, this.DeleteRetentionMiliseconds); writer.WriteOptionalProperty(AsyncApiConstants.MaxMessageBytes, this.MaxMessageBytes); + writer.WriteOptionalProperty(AsyncApiConstants.ConfluentKeySchemaValidation, this.ConfluentKeySchemaValidation); + writer.WriteOptionalProperty(AsyncApiConstants.ConfluentKeySubjectName, this.ConfluentKeySubjectName); + writer.WriteOptionalProperty(AsyncApiConstants.ConfluentValueSchemaValidation, this.ConfluentValueSchemaValidation); + writer.WriteOptionalProperty(AsyncApiConstants.ConfluentValueSubjectName, this.ConfluentValueSubjectName); writer.WriteEndObject(); } } diff --git a/src/LEGO.AsyncAPI/Models/AsyncApiConstants.cs b/src/LEGO.AsyncAPI/Models/AsyncApiConstants.cs index 37b1b45d..d83915bf 100644 --- a/src/LEGO.AsyncAPI/Models/AsyncApiConstants.cs +++ b/src/LEGO.AsyncAPI/Models/AsyncApiConstants.cs @@ -136,6 +136,10 @@ public static class AsyncApiConstants public const string RetentionBytes = "retention.bytes"; public const string DeleteRetentionMiliseconds = "delete.retention.ms"; public const string MaxMessageBytes = "max.message.bytes"; + public const string ConfluentKeySchemaValidation = "confluent.key.schema.validation"; + public const string ConfluentKeySubjectName = "confluent.key.subject.name.strategy"; + public const string ConfluentValueSchemaValidation = "confluent.value.schema.validation"; + public const string ConfluentValueSubjectName = "confluent.value.subject.name.strategy"; public const string TopicConfiguration = "topicConfiguration"; public const string GeoReplication = "geo-replication"; public const string AdditionalItems = "additionalItems"; diff --git a/test/LEGO.AsyncAPI.Tests/Bindings/Kafka/KafkaBindings_Should.cs b/test/LEGO.AsyncAPI.Tests/Bindings/Kafka/KafkaBindings_Should.cs index efeac315..4a791556 100644 --- a/test/LEGO.AsyncAPI.Tests/Bindings/Kafka/KafkaBindings_Should.cs +++ b/test/LEGO.AsyncAPI.Tests/Bindings/Kafka/KafkaBindings_Should.cs @@ -30,7 +30,11 @@ public void KafkaChannelBinding_WithFilledObject_SerializesAndDeserializes() retention.ms: 1 retention.bytes: 2 delete.retention.ms: 3 - max.message.bytes: 4"; + max.message.bytes: 4 + confluent.key.schema.validation: true + confluent.key.subject.name.strategy: TopicNameStrategy + confluent.value.schema.validation: true + confluent.value.subject.name.strategy: TopicNameStrategy"; var channel = new AsyncApiChannel(); channel.Bindings.Add(new KafkaChannelBinding @@ -45,6 +49,10 @@ public void KafkaChannelBinding_WithFilledObject_SerializesAndDeserializes() RetentionBytes = 2, DeleteRetentionMiliseconds = 3, MaxMessageBytes = 4, + ConfluentKeySchemaValidation = true, + ConfluentKeySubjectName = "TopicNameStrategy", + ConfluentValueSchemaValidation = true, + ConfluentValueSubjectName = "TopicNameStrategy", }, });