Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bindings): add pulsar bindings #74

Merged
merged 8 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Readers
{
using LEGO.AsyncAPI.Models.Bindings.Pulsar;
using LEGO.AsyncAPI.Readers.ParseNodes;
using LEGO.AsyncAPI.Writers;

internal static partial class AsyncApiV2Deserializer
{
private static FixedFieldMap<PulsarServerBinding> pulsarServerBindingFixedFields = new ()
{
{ "bindingVersion", (a, n) => { a.BindingVersion = n.GetScalarValue(); } },
{ "tenant", (a, n) => { a.Tenant = n.GetScalarValue(); } },
};

private static FixedFieldMap<PulsarChannelBinding> pulsarChannelBindingFixedFields = new ()
{
{ "bindingVersion", (a, n) => { a.BindingVersion = n.GetScalarValue(); } },
{ "namespace", (a, n) => { a.Namespace = n.GetScalarValue(); } },
{ "persistence", (a, n) => { a.Persistence = n.GetScalarValue().GetEnumFromDisplayName<Persistence>(); } },
{ "compaction", (a, n) => { a.Compaction = n.GetIntegerValue(); } },
{ "retention", (a, n) => { a.Retention = LoadRetention(n); } },
{ "geo-replication", (a, n) => { a.GeoReplication = n.CreateSimpleList(s => s.GetScalarValue()); } },
{ "ttl", (a, n) => { a.TTL = n.GetIntegerValue(); } },
{ "deduplication", (a, n) => { a.Deduplication = n.GetBooleanValue(); } },
};

private static FixedFieldMap<RetentionDefinition> pulsarServerBindingRetentionFixedFields = new ()
{
{ "time", (a, n) => { a.Time = n.GetIntegerValue(); } },
{ "size", (a, n) => { a.Size = n.GetIntegerValue(); } },
};

private static RetentionDefinition LoadRetention(ParseNode node)
{
var mapNode = node.CheckMapNode("retention");
var retention = new RetentionDefinition();
ParseMap(mapNode, retention, pulsarServerBindingRetentionFixedFields, null);
return retention;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ internal static IChannelBinding LoadChannelBinding(ParseNode node)
{
case BindingType.Kafka:
return LoadBinding("ChannelBinding", property.Value, kafkaChannelBindingFixedFields);
case BindingType.Pulsar:
return LoadBinding("ChannelBinding", property.Value, pulsarChannelBindingFixedFields);
case BindingType.Websockets:
return LoadBinding("ChannelBinding", property.Value, webSocketsChannelBindingFixedFields);
default:
Expand Down
9 changes: 5 additions & 4 deletions src/LEGO.AsyncAPI.Readers/V2/AsyncApiChannelDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace LEGO.AsyncAPI.Readers

internal static partial class AsyncApiV2Deserializer
{
private static readonly FixedFieldMap<AsyncApiChannel> channelFixedFields = new()
private static readonly FixedFieldMap<AsyncApiChannel> ChannelFixedFields = new ()
{
{ "description", (a, n) => { a.Description = n.GetScalarValue(); } },
{ "servers", (a, n) => { a.Servers = n.CreateSimpleList(s => s.GetScalarValue()); } },
Expand All @@ -18,8 +18,8 @@ internal static partial class AsyncApiV2Deserializer
{ "bindings", (a, n) => { a.Bindings = LoadChannelBindings(n); } },
};

private static readonly PatternFieldMap<AsyncApiChannel> channelPatternFields =
new()
private static readonly PatternFieldMap<AsyncApiChannel> ChannelPatternFields =
new ()
{
{ s => s.StartsWith("x-"), (a, p, n) => a.AddExtension(p, LoadExtension(p, n)) },
};
Expand All @@ -32,9 +32,10 @@ public static AsyncApiChannel LoadChannel(ParseNode node)
{
return mapNode.GetReferencedObject<AsyncApiChannel>(ReferenceType.Channel, pointer);
}

var pathItem = new AsyncApiChannel();

ParseMap(mapNode, pathItem, channelFixedFields, channelPatternFields);
ParseMap(mapNode, pathItem, ChannelFixedFields, ChannelPatternFields);

return pathItem;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ internal static IServerBinding LoadServerBinding(ParseNode node)
{
case BindingType.Kafka:
return LoadBinding("ServerBinding", property.Value, kafkaServerBindingFixedFields);
case BindingType.Pulsar:
return LoadBinding("ServerBinding", property.Value, pulsarServerBindingFixedFields);
default:
throw new AsyncApiException($"ServerBinding {property.Name} is not supported");
}
Expand Down
2 changes: 2 additions & 0 deletions src/LEGO.AsyncAPI/Models/AsyncApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public static class AsyncApiConstants
public const string Size = "size";
public const string Persistence = "persistence";
public const string Compaction = "compaction";
public const string TTL = "ttl";
public const string Tenant = "tenant";
public const string Namespace = "namespace";
public const string ServerVariables = "serverVariables";
Expand All @@ -136,5 +137,6 @@ public static class AsyncApiConstants
public const string DeleteRetentionMiliseconds = "delete.retention.ms";
public const string MaxMessageBytes = "max.message.bytes";
public const string TopicConfiguration = "topicConfiguration";
public const string GeoReplication = "geo-replication";
}
}
3 changes: 3 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/BindingType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@ public enum BindingType

[Display("websockets")]
Websockets,

[Display("pulsar")]
Pulsar,
}
}
15 changes: 15 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/Pulsar/Persistence.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Bindings.Pulsar
{
using LEGO.AsyncAPI.Attributes;

public enum Persistence
{
[Display("persistent")]
Persistent,

[Display("non-persistent")]
NonPersistent,
}
}
101 changes: 101 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/Pulsar/PulsarChannelBinding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Bindings.Pulsar
{
using System;
using System.Collections.Generic;
using LEGO.AsyncAPI.Models.Interfaces;
using LEGO.AsyncAPI.Writers;

/// <summary>
/// Binding class for Pulsar server settings.
/// </summary>
public class PulsarChannelBinding : IChannelBinding
{
/// <summary>
/// The namespace associated with the topic.
/// </summary>
public string Namespace { get; set; }

/// <summary>
/// persistence of the topic in Pulsar persistent or non-persistent.
/// </summary>
public Persistence Persistence { get; set; }

/// <summary>
/// Topic compaction threshold given in bytes.
/// </summary>
public int Compaction { get; set; }

/// <summary>
/// A list of clusters the topic is replicated to.
/// </summary>
public IEnumerable<string> GeoReplication { get; set; }

/// <summary>
/// Topic retention policy.
/// </summary>
public RetentionDefinition Retention { get; set; }

/// <summary>
/// Message Time-to-live in seconds.
/// </summary>
public int TTL { get; set; }

/// <summary>
/// When Message deduplication is enabled, it ensures that each message produced on Pulsar topics is persisted to disk only once.
/// </summary>
public bool Deduplication { get; set; }

/// <summary>
/// The version of this binding.
public string BindingVersion { get; set; }

public BindingType Type => BindingType.Pulsar;

public bool UnresolvedReference { get; set; }

public AsyncApiReference Reference { get; set; }

public IDictionary<string, IAsyncApiExtension> Extensions { get; set; } = new Dictionary<string, IAsyncApiExtension>();

/// <summary>
/// Serialize to AsyncAPI V2 document without using reference.
/// </summary>
public void SerializeV2WithoutReference(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

writer.WriteStartObject();
writer.WriteRequiredProperty(AsyncApiConstants.Namespace, this.Namespace);
writer.WriteRequiredProperty(AsyncApiConstants.Persistence, this.Persistence.GetDisplayName());
writer.WriteOptionalProperty<int>(AsyncApiConstants.Compaction, this.Compaction);
writer.WriteOptionalCollection(AsyncApiConstants.GeoReplication, this.GeoReplication, (v, s) => v.WriteValue(s));
writer.WriteOptionalObject(AsyncApiConstants.Retention, this.Retention, (w, r) => r.Serialize(w));
writer.WriteOptionalProperty<int>(AsyncApiConstants.TTL, this.TTL);
writer.WriteOptionalProperty(AsyncApiConstants.Deduplication, this.Deduplication);
writer.WriteOptionalProperty(AsyncApiConstants.BindingVersion, this.BindingVersion);

writer.WriteEndObject();
}

public void SerializeV2(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

if (this.Reference != null && writer.GetSettings().ReferenceInline != ReferenceInlineSetting.InlineReferences)
{
this.Reference.SerializeV2(writer);
return;
}

this.SerializeV2WithoutReference(writer);
}
}
}
66 changes: 66 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/Pulsar/PulsarServerBinding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Bindings.Pulsar
{
using System;
using System.Collections.Generic;
using LEGO.AsyncAPI.Models.Interfaces;
using LEGO.AsyncAPI.Writers;

/// <summary>
/// Binding class for Pulsar server settings.
/// </summary>
public class PulsarServerBinding : IServerBinding
{
/// <summary>
/// The pulsar tenant. If omitted, "public" must be assumed.
/// </summary>
public string Tenant { get; set; }

/// <summary>
/// The version of this binding.
public string BindingVersion { get; set; }

public BindingType Type => BindingType.Pulsar;

public bool UnresolvedReference { get; set; }

public AsyncApiReference Reference { get; set; }

public IDictionary<string, IAsyncApiExtension> Extensions { get; set; } = new Dictionary<string, IAsyncApiExtension>();

/// <summary>
/// Serialize to AsyncAPI V2 document without using reference.
/// </summary>
public void SerializeV2WithoutReference(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

writer.WriteStartObject();

writer.WriteOptionalProperty(AsyncApiConstants.Tenant, this.Tenant);
writer.WriteOptionalProperty(AsyncApiConstants.BindingVersion, this.BindingVersion);

writer.WriteEndObject();
}

public void SerializeV2(IAsyncApiWriter writer)
{
if (writer is null)
{
throw new ArgumentNullException(nameof(writer));
}

if (this.Reference != null && writer.GetSettings().ReferenceInline != ReferenceInlineSetting.InlineReferences)
{
this.Reference.SerializeV2(writer);
return;
}

this.SerializeV2WithoutReference(writer);
}
}
}
28 changes: 28 additions & 0 deletions src/LEGO.AsyncAPI/Models/Bindings/Pulsar/RetentionDefinition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Bindings.Pulsar
{
using LEGO.AsyncAPI.Models.Interfaces;
using LEGO.AsyncAPI.Writers;

public class RetentionDefinition : IAsyncApiElement
{
/// <summary>
/// Time given in Minutes. 0 = Disable message retention (by default).
/// </summary>
public int Time { get; set; }

/// <summary>
/// Size given in MegaBytes. 0 = Disable message retention (by default).
/// </summary>
public int Size { get; set; }

public void Serialize(IAsyncApiWriter writer)
{
writer.WriteStartObject();
writer.WriteRequiredProperty(AsyncApiConstants.Time, this.Time);
writer.WriteRequiredProperty(AsyncApiConstants.Size, this.Size);
writer.WriteEndObject();
}
}
}
Loading