Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bindings): add pulsar bindings #74

Merged
merged 8 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ internal static IChannelBinding LoadChannelBinding(ParseNode node)
{
case BindingType.Kafka:
return LoadBinding("ChannelBinding", property.Value, kafkaChannelBindingFixedFields);
case BindingType.Pulsar:
return LoadBinding("ChannelBinding", property.Value, pulsarChannelBindingFixedFields);
case BindingType.Websockets:
return LoadBinding("ChannelBinding", property.Value, webSocketsChannelBindingFixedFields);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ internal static IServerBinding LoadServerBinding(ParseNode node)
{
case BindingType.Kafka:
return LoadBinding("ServerBinding", property.Value, kafkaServerBindingFixedFields);
case BindingType.Pulsar:
return LoadBinding("ServerBinding", property.Value, pulsarServerBindingFixedFields);
default:
throw new System.Exception("ServerBinding not found");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Readers
{
using LEGO.AsyncAPI.Models.Bindings.Pulsar;
using LEGO.AsyncAPI.Readers.ParseNodes;

internal static partial class AsyncApiDeserializer
{
private static FixedFieldMap<PulsarServerBinding> pulsarServerBindingFixedFields = new ()
{
{ "bindingVersion", (a, n) => { a.BindingVersion = n.GetScalarValue(); } },
{ "tenant", (a, n) => { a.Tenant = n.GetScalarValue(); } },
};

private static FixedFieldMap<PulsarChannelBinding> pulsarChannelBindingFixedFields = new ()
{
{ "bindingVersion", (a, n) => { a.BindingVersion = n.GetScalarValue(); } },
{ "namespace", (a, n) => { a.Namespace = n.GetScalarValue(); } },
{ "persistence", (a, n) => { a.Persistence = n.GetScalarValue(); } },
{ "compaction", (a, n) => { a.Compaction = n.GetIntegerValue(); } },
{ "retention", (a, n) => { a.Retention = LoadRetention(n); } },
{ "geo-replication", (a, n) => { a.GeoReplication = n.CreateSimpleList(s => s.GetScalarValue()); } },
{ "ttl", (a, n) => { a.TTL = n.GetIntegerValue(); } },
{ "deduplication", (a, n) => { a.Deduplication = n.GetBooleanValue(); } },
};

private static FixedFieldMap<RetentionDefinition> pulsarServerBindingRetentionFixedFields = new ()
{
{ "time", (a, n) => { a.Time = n.GetIntegerValue(); } },
{ "size", (a, n) => { a.Size = n.GetIntegerValue(); } },
};

private static RetentionDefinition LoadRetention(ParseNode node)
{
var mapNode = node.CheckMapNode("retention");
var retention = new RetentionDefinition();
ParseMap(mapNode, retention, pulsarServerBindingRetentionFixedFields, null);
return retention;
}
}
}
2 changes: 2 additions & 0 deletions src/LEGO.AsyncAPI/Models/AsyncApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ public static class AsyncApiConstants
public const string Size = "size";
public const string Persistence = "persistence";
public const string Compaction = "compaction";
public const string TTL = "ttl";
public const string Tenant = "tenant";
public const string Namespace = "namespace";
public const string GeoReplication = "geo-replication";
}
}
3 changes: 3 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/BindingType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@ public enum BindingType

[Display("websockets")]
Websockets,

[Display("pulsar")]
Pulsar,
}
}
102 changes: 102 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/Pulsar/PulsarChannelBinding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Bindings.Pulsar
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using LEGO.AsyncAPI.Models.Interfaces;
using LEGO.AsyncAPI.Writers;

/// <summary>
/// Binding class for Pulsar server settings.
/// </summary>
public class PulsarChannelBinding : IChannelBinding
{
/// <summary>
/// The namespace associated with the topic.
/// </summary>
public string Namespace { get; set; }

/// <summary>
/// persistence of the topic in Pulsar persistent or non-persistent.
/// </summary>
public string Persistence { get; set; }

/// <summary>
/// Topic compaction threshold given in bytes.
/// </summary>
public int Compaction { get; set; }

/// <summary>
/// A list of clusters the topic is replicated to.
/// </summary>
public IEnumerable<string> GeoReplication { get; set; }

/// <summary>
/// Topic retention policy.
/// </summary>
public RetentionDefinition Retention { get; set; }

/// <summary>
/// Message Time-to-live in seconds.
/// </summary>
public int TTL { get; set; }

/// <summary>
/// When Message deduplication is enabled, it ensures that each message produced on Pulsar topics is persisted to disk only once.
/// </summary>
public bool Deduplication { get; set; }

/// <summary>
/// The version of this binding.
public string BindingVersion { get; set; }

public BindingType Type => BindingType.Pulsar;

public bool UnresolvedReference { get; set; }

public AsyncApiReference Reference { get; set; }

public IDictionary<string, IAsyncApiExtension> Extensions { get; set; } = new Dictionary<string, IAsyncApiExtension>();

/// <summary>
/// Serialize to AsyncAPI V2 document without using reference.
/// </summary>
public void SerializeV2WithoutReference(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

writer.WriteStartObject();
writer.WriteOptionalProperty(AsyncApiConstants.Namespace, this.Namespace);
writer.WriteOptionalProperty(AsyncApiConstants.Persistence, this.Persistence);
Alquila marked this conversation as resolved.
Show resolved Hide resolved
writer.WriteOptionalProperty<int>(AsyncApiConstants.Compaction, this.Compaction);
writer.WriteOptionalObject(AsyncApiConstants.Retention, this.Retention, (w, r) => r.Serialize(w));
writer.WriteOptionalCollection(AsyncApiConstants.GeoReplication, this.GeoReplication, (v, s) => v.WriteValue(s));
writer.WriteOptionalProperty<int>(AsyncApiConstants.TTL, this.TTL);
writer.WriteOptionalProperty(AsyncApiConstants.Deduplication, this.Deduplication);
writer.WriteOptionalProperty(AsyncApiConstants.BindingVersion, this.BindingVersion);

writer.WriteEndObject();
}

public void SerializeV2(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

if (this.Reference != null && writer.GetSettings().ReferenceInline != ReferenceInlineSetting.InlineReferences)
{
this.Reference.SerializeV2(writer);
return;
}

this.SerializeV2WithoutReference(writer);
}
}
}
66 changes: 66 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/Pulsar/PulsarServerBinding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Bindings.Pulsar
{
using System;
using System.Collections.Generic;
using LEGO.AsyncAPI.Models.Interfaces;
using LEGO.AsyncAPI.Writers;

/// <summary>
/// Binding class for Pulsar server settings.
/// </summary>
public class PulsarServerBinding : IServerBinding
{
/// <summary>
/// The pulsar tenant. If omitted, "public" must be assumed.
/// </summary>
public string Tenant { get; set; }

/// <summary>
/// The version of this binding.
public string BindingVersion { get; set; }

public BindingType Type => BindingType.Pulsar;

public bool UnresolvedReference { get; set; }

public AsyncApiReference Reference { get; set; }

public IDictionary<string, IAsyncApiExtension> Extensions { get; set; } = new Dictionary<string, IAsyncApiExtension>();

/// <summary>
/// Serialize to AsyncAPI V2 document without using reference.
/// </summary>
public void SerializeV2WithoutReference(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

writer.WriteStartObject();

writer.WriteOptionalProperty(AsyncApiConstants.Tenant, this.Tenant);
writer.WriteOptionalProperty(AsyncApiConstants.BindingVersion, this.BindingVersion);

writer.WriteEndObject();
}

public void SerializeV2(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

if (this.Reference != null && writer.GetSettings().ReferenceInline != ReferenceInlineSetting.InlineReferences)
{
this.Reference.SerializeV2(writer);
return;
}

this.SerializeV2WithoutReference(writer);
}
}
}
28 changes: 28 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/Pulsar/RetentionDefinition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Bindings.Pulsar
{
using LEGO.AsyncAPI.Models.Interfaces;
using LEGO.AsyncAPI.Writers;

public class RetentionDefinition : IAsyncApiElement
{
/// <summary>
/// Time given in Minutes. 0 = Disable message retention (by default).
/// </summary>
public int Time { get; set; }

/// <summary>
/// Size given in MegaBytes. 0 = Disable message retention (by default).
/// </summary>
public int Size { get; set; }

public void Serialize(IAsyncApiWriter writer)
{
writer.WriteStartObject();
writer.WriteRequiredProperty(AsyncApiConstants.Time, this.Time);
writer.WriteRequiredProperty(AsyncApiConstants.Size, this.Size);
writer.WriteEndObject();
}
}
}
44 changes: 42 additions & 2 deletions test/LEGO.AsyncAPI.Tests/AsyncApiDocumentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using LEGO.AsyncAPI.Models.Bindings;
using LEGO.AsyncAPI.Models.Bindings.Http;
using LEGO.AsyncAPI.Models.Bindings.Kafka;
using LEGO.AsyncAPI.Models.Bindings.Pulsar;
using LEGO.AsyncAPI.Models.Interfaces;
using LEGO.AsyncAPI.Readers;
using LEGO.AsyncAPI.Writers;
Expand Down Expand Up @@ -462,6 +463,9 @@ public void Serialize_WithBindings_Serializes()
url: example.com
protocol: pulsar+ssl
description: test description
bindings:
pulsar:
tenant: contoso
channels:
testChannel:
publish:
Expand All @@ -476,8 +480,19 @@ public void Serialize_WithBindings_Serializes()
bindings:
kafka:
partitions: 2
replicas: 1";

replicas: 1
pulsar:
namespace: staging
persistence: persistent
compaction: 1000
retention:
time: 4
size: 1
geo-replication:
- east
- west
ttl: 42
deduplication: true";
var doc = new AsyncApiDocument();
doc.Info = new AsyncApiInfo()
{
Expand All @@ -488,6 +503,15 @@ public void Serialize_WithBindings_Serializes()
Description = "test description",
Protocol = "pulsar+ssl",
Url = "example.com",
Bindings = new AsyncApiBindings<IServerBinding>
{
{
new PulsarServerBinding
{
Tenant = "contoso",
}
},
},
});
doc.Channels.Add("testChannel",
new AsyncApiChannel
Expand All @@ -501,6 +525,22 @@ public void Serialize_WithBindings_Serializes()
Replicas = 1,
}
},
{
new PulsarChannelBinding
{
Namespace = "staging",
Persistence = "persistent",
Compaction = 1000,
GeoReplication = new List<string>{"east", "west"},
Retention = new RetentionDefinition()
{
Time = 4,
Size = 1,
},
TTL = 42,
Deduplication = true,
}
},
},
Publish = new AsyncApiOperation
{
Expand Down