Skip to content

Commit

Permalink
add support for Avro Logical types
Browse files Browse the repository at this point in the history
  • Loading branch information
VisualBean committed Oct 14, 2024
1 parent 37ee652 commit fa84506
Show file tree
Hide file tree
Showing 16 changed files with 448 additions and 12 deletions.
153 changes: 152 additions & 1 deletion src/LEGO.AsyncAPI.Readers/V2/AsyncApiAvroSchemaDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
namespace LEGO.AsyncAPI.Readers
{
using System;
using System.Threading;
using LEGO.AsyncAPI.Exceptions;
using LEGO.AsyncAPI.Models;
using LEGO.AsyncAPI.Models.Avro.LogicalTypes;
using LEGO.AsyncAPI.Readers.Exceptions;
using LEGO.AsyncAPI.Readers.ParseNodes;
using LEGO.AsyncAPI.Writers;
Expand Down Expand Up @@ -68,6 +70,60 @@ public class AsyncApiAvroSchemaDeserializer
{ "types", (a, n) => a.Types = n.CreateList(LoadSchema) },
};

private static readonly FixedFieldMap<AvroDecimal> DecimalFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
{ "precision", (a, n) => a.Precision = int.Parse(n.GetScalarValue()) },
{ "scale", (a, n) => a.Scale = int.Parse(n.GetScalarValue()) },
};

private static readonly FixedFieldMap<AvroUUID> UUIDFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
};

private static readonly FixedFieldMap<AvroDate> DateFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
};

private static readonly FixedFieldMap<AvroTimeMillis> TimeMillisFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
};

private static readonly FixedFieldMap<AvroTimeMicros> TimeMicrosFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
};

private static readonly FixedFieldMap<AvroTimestampMillis> TimestampMillisFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
};

private static readonly FixedFieldMap<AvroTimestampMicros> TimestampMicrosFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
};

private static readonly FixedFieldMap<AvroDuration> DurationFixedFields = new()
{
{ "type", (a, n) => { } },
{ "logicalType", (a, n) => { } },
{ "name", (a, n) => a.Name = n.GetScalarValue() },
{ "namespace", (a, n) => a.Namespace = n.GetScalarValue() },
{ "aliases", (a, n) => a.Aliases = n.CreateSimpleList(n2 => n2.GetScalarValue()) },
{ "size", (a, n) => { } },
};

private static readonly PatternFieldMap<AvroRecord> RecordMetadataPatternFields =
new()
{
Expand Down Expand Up @@ -110,6 +166,54 @@ public class AsyncApiAvroSchemaDeserializer
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroDecimal> DecimalMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroUUID> UUIDMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroDate> DateMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroTimeMillis> TimeMillisMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroTimeMicros> TimeMicrosMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroTimestampMillis> TimestampMillisMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroTimestampMicros> TimestampMicrosMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

private static readonly PatternFieldMap<AvroDuration> DurationMetadataPatternFields =
new()
{
{ s => s.StartsWith(string.Empty), (a, p, n) => a.Metadata[p] = n.CreateAny() },
};

public static AvroSchema LoadSchema(ParseNode node)
{
if (node is ValueNode valueNode)
Expand Down Expand Up @@ -141,8 +245,13 @@ public static AvroSchema LoadSchema(ParseNode node)
};
}

var type = mapNode["type"]?.Value.GetScalarValue();
var isLogicalType = mapNode["logicalType"] != null;
if (isLogicalType)
{
return LoadLogicalType(mapNode);
}

var type = mapNode["type"]?.Value.GetScalarValue();
switch (type)
{
case "record":
Expand Down Expand Up @@ -177,6 +286,48 @@ public static AvroSchema LoadSchema(ParseNode node)
throw new AsyncApiReaderException("Invalid node type");
}

private static AvroSchema LoadLogicalType(MapNode mapNode)
{
var type = mapNode["logicalType"]?.Value.GetScalarValue();
switch (type)
{
case "decimal":
var @decimal = new AvroDecimal();
mapNode.ParseFields(ref @decimal, DecimalFixedFields, DecimalMetadataPatternFields);
return @decimal;
case "uuid":
var uuid = new AvroUUID();
mapNode.ParseFields(ref uuid, UUIDFixedFields, UUIDMetadataPatternFields);
return uuid;
case "date":
var date = new AvroDate();
mapNode.ParseFields(ref date, DateFixedFields, DateMetadataPatternFields);
return date;
case "time-millis":
var timeMillis = new AvroTimeMillis();
mapNode.ParseFields(ref timeMillis, TimeMillisFixedFields, TimeMillisMetadataPatternFields);
return timeMillis;
case "time-micros":
var timeMicros = new AvroTimeMicros();
mapNode.ParseFields(ref timeMicros, TimeMicrosFixedFields, TimeMicrosMetadataPatternFields);
return timeMicros;
case "timestamp-millis":
var timestampMillis = new AvroTimestampMillis();
mapNode.ParseFields(ref timestampMillis, TimestampMillisFixedFields, TimestampMillisMetadataPatternFields);
return timestampMillis;
case "timestamp-micros":
var timestampMicros = new AvroTimestampMicros();
mapNode.ParseFields(ref timestampMicros, TimestampMicrosFixedFields, TimestampMicrosMetadataPatternFields);
return timestampMicros;
case "duration":
var duration = new AvroDuration();
mapNode.ParseFields(ref duration, DurationFixedFields, DurationMetadataPatternFields);
return duration;
default:
throw new AsyncApiException($"Unsupported type: {type}");
}
}

private static AvroField LoadField(ParseNode node)
{
var mapNode = node.CheckMapNode("field");
Expand Down
1 change: 0 additions & 1 deletion src/LEGO.AsyncAPI/Models/Avro/AvroMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace LEGO.AsyncAPI.Models
{
using System;
using System.Collections.Generic;
using System.Linq;
using LEGO.AsyncAPI.Writers;
Expand Down
4 changes: 0 additions & 4 deletions src/LEGO.AsyncAPI/Models/Avro/AvroPrimitive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ public AvroPrimitive(AvroPrimitiveType type)
this.Type = type.GetDisplayName();
}

public AvroPrimitive()
{
}

public override void SerializeV2WithoutReference(IAsyncApiWriter writer)
{
writer.WriteValue(this.Type);
Expand Down
14 changes: 14 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroDate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
public class AvroDate : AvroLogicalType
{
public AvroDate()
: base(AvroPrimitiveType.Int)
{
}

public override LogicalType LogicalType => LogicalType.Date;
}
}
26 changes: 26 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroDecimal.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
using LEGO.AsyncAPI.Writers;

public class AvroDecimal : AvroLogicalType
{
public AvroDecimal()
: base(AvroPrimitiveType.Bytes)
{
}

public override LogicalType LogicalType => LogicalType.Decimal;

public int? Scale { get; set; }

public int? Precision { get; set; }

public override void SerializeV2Core(IAsyncApiWriter writer)
{
writer.WriteOptionalProperty("scale", this.Scale);
writer.WriteOptionalProperty("precision", this.Precision);
}
}
}
42 changes: 42 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroDuration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
using System.Linq;
using LEGO.AsyncAPI.Writers;

public class AvroDuration : AvroFixed
{
public LogicalType LogicalType { get; } = LogicalType.Duration;

public new int Size { get; } = 12;

public override void SerializeV2WithoutReference(IAsyncApiWriter writer)
{
writer.WriteStartObject();
writer.WriteOptionalProperty("type", this.Type);
writer.WriteOptionalProperty("logicalType", this.LogicalType.GetDisplayName());
writer.WriteRequiredProperty("name", this.Name);
writer.WriteOptionalProperty("namespace", this.Namespace);
writer.WriteOptionalCollection("aliases", this.Aliases, (w, s) => w.WriteValue(s));
writer.WriteRequiredProperty("size", this.Size);
if (this.Metadata.Any())
{
foreach (var item in this.Metadata)
{
writer.WritePropertyName(item.Key);
if (item.Value == null)
{
writer.WriteNull();
}
else
{
writer.WriteAny(item.Value);
}
}
}

writer.WriteEndObject();
}
}
}
48 changes: 48 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroLogicalType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
using System.Linq;
using LEGO.AsyncAPI.Writers;

public abstract class AvroLogicalType : AvroPrimitive
{
protected AvroLogicalType(AvroPrimitiveType type)
: base(type)
{
}

public abstract LogicalType LogicalType { get; }

public virtual void SerializeV2Core(IAsyncApiWriter writer)
{
}

public override void SerializeV2WithoutReference(IAsyncApiWriter writer)
{
writer.WriteStartObject();
writer.WriteOptionalProperty("type", this.Type);
writer.WriteOptionalProperty("logicalType", this.LogicalType.GetDisplayName());

this.SerializeV2Core(writer);

if (this.Metadata.Any())
{
foreach (var item in this.Metadata)
{
writer.WritePropertyName(item.Key);
if (item.Value == null)
{
writer.WriteNull();
}
else
{
writer.WriteAny(item.Value);
}
}
}

writer.WriteEndObject();
}
}
}
14 changes: 14 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroTimeMicros.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
public class AvroTimeMicros : AvroLogicalType
{
public AvroTimeMicros()
: base(AvroPrimitiveType.Long)
{
}

public override LogicalType LogicalType => LogicalType.Time_Micros;
}
}
14 changes: 14 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroTimeMillis.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
public class AvroTimeMillis : AvroLogicalType
{
public AvroTimeMillis()
: base(AvroPrimitiveType.Int)
{
}

public override LogicalType LogicalType => LogicalType.Time_Millis;
}
}
14 changes: 14 additions & 0 deletions src/LEGO.AsyncAPI/Models/Avro/LogicalTypes/AvroTimestampMicros.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) The LEGO Group. All rights reserved.

namespace LEGO.AsyncAPI.Models.Avro.LogicalTypes
{
public class AvroTimestampMicros : AvroLogicalType
{
public AvroTimestampMicros()
: base(AvroPrimitiveType.Long)
{
}

public override LogicalType LogicalType => LogicalType.Timestamp_Micros;
}
}
Loading

0 comments on commit fa84506

Please sign in to comment.