Skip to content

Commit

Permalink
Fixup serialization issues
Browse files Browse the repository at this point in the history
  • Loading branch information
dpbevin committed Jun 2, 2024
1 parent a101fc6 commit 5b93e76
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public SiloKinesisStreamConfigurator ConfigureKinesis(Action<KinesisStreamOption
return this;
}

public SiloKinesisStreamConfigurator UseGrainCheckpointer()
{
this.ConfigureComponent(GrainCheckpointFactory.CreateFactory);
return this;
}

public SiloKinesisStreamConfigurator ConfigureCheckpointer<TOptions>(
Func<IServiceProvider, string, IStreamQueueCheckpointerFactory> checkpointerFactoryBuilder,
Action<OptionsBuilder<TOptions>> configureOptions)
Expand Down
24 changes: 11 additions & 13 deletions src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Amazon;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using Amazon.Runtime;
Expand All @@ -19,10 +20,10 @@ namespace Orleans.Streaming.Kinesis
/// <summary>
/// Queue adapter factory which allows the PersistentStreamProvider to use AWS Kinesis Data Streams as its backend persistent event queue.
/// </summary>
public class KinesisAdapterFactory : IQueueAdapterFactory, IQueueAdapter
internal class KinesisAdapterFactory : IQueueAdapterFactory, IQueueAdapter
{
private readonly KinesisStreamOptions _options;
private readonly Serializer _serializer;
private readonly Serializer<KinesisBatchContainer.Body> _serializer;
private readonly IStreamQueueCheckpointerFactory _checkpointerFactory;
private readonly ILoggerFactory _loggerFactory;
private readonly IQueueAdapterCache _adapterCache;
Expand All @@ -36,7 +37,7 @@ public KinesisAdapterFactory(
string name,
KinesisStreamOptions options,
SimpleQueueCacheOptions cacheOptions,
Serializer serializer,
Serializer<KinesisBatchContainer.Body> serializer,
IStreamQueueCheckpointerFactory checkpointerFactory,
ILoggerFactory loggerFactory
)
Expand Down Expand Up @@ -69,10 +70,9 @@ public static KinesisAdapterFactory Create(IServiceProvider services, string nam
{
var streamsConfig = services.GetOptionsByName<KinesisStreamOptions>(name);
var cacheOptions = services.GetOptionsByName<SimpleQueueCacheOptions>(name);
var serializer = services.GetRequiredService<Serializer>();
var logger = services.GetRequiredService<ILoggerFactory>();
var grainFactory = services.GetRequiredService<IGrainFactory>();
var serializer = services.GetRequiredService<Serializer<KinesisBatchContainer.Body>>();
var checkpointerFactory = services.GetRequiredKeyedService<IStreamQueueCheckpointerFactory>(name);
var logger = services.GetRequiredService<ILoggerFactory>();

var factory = ActivatorUtilities.CreateInstance<KinesisAdapterFactory>(
services,
Expand All @@ -81,9 +81,7 @@ public static KinesisAdapterFactory Create(IServiceProvider services, string nam
cacheOptions,
serializer,
checkpointerFactory,
logger,
grainFactory,
services
logger
);

return factory;
Expand Down Expand Up @@ -137,7 +135,7 @@ public IQueueAdapterReceiver CreateReceiver(QueueId queueId)
);
}

private AmazonKinesisClient CreateClient()
internal AmazonKinesisClient CreateClient()
{
if (_options.Service.StartsWith("http://", StringComparison.OrdinalIgnoreCase) ||
_options.Service.StartsWith("https://", StringComparison.OrdinalIgnoreCase))
Expand All @@ -153,16 +151,16 @@ private AmazonKinesisClient CreateClient()
{
// AWS Kinesis instance (auth via explicit credentials)
var credentials = new BasicAWSCredentials(_options.AccessKey, _options.SecretKey);
return new AmazonKinesisClient(credentials, new AmazonKinesisConfig { RegionEndpoint = AWSUtils.GetRegionEndpoint(_options.Service) });
return new AmazonKinesisClient(credentials, new AmazonKinesisConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(_options.Service) });
}
else
{
// AWS Kinesis instance (implicit auth - EC2 IAM Roles etc)
return new AmazonKinesisClient(new AmazonKinesisConfig { RegionEndpoint = AWSUtils.GetRegionEndpoint(_options.Service) });
return new AmazonKinesisClient(new AmazonKinesisConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(_options.Service) });
}
}

private async Task<string[]> GetPartitionIdsAsync()
internal async Task<string[]> GetPartitionIdsAsync()
{
var request = new ListShardsRequest
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

namespace Orleans.Streaming.Kinesis
{
public class KinesisAdapterReceiver : IQueueAdapterReceiver
internal class KinesisAdapterReceiver : IQueueAdapterReceiver
{
private readonly ILogger<KinesisAdapterReceiver> _logger;
private readonly AmazonKinesisClient _client;
private readonly string _streamName;
private readonly string _partition;
private readonly IStreamQueueCheckpointerFactory _checkpointerFactory;
private readonly Serializer _serializer;
private readonly Serializer<KinesisBatchContainer.Body> _serializer;

private IStreamQueueCheckpointer<string> _checkpointer;
private string _shardIterator;
Expand All @@ -28,7 +28,7 @@ internal KinesisAdapterReceiver(
string streamName,
string partition,
IStreamQueueCheckpointerFactory checkpointerFactory,
Serializer serializer,
Serializer<KinesisBatchContainer.Body> serializer,
ILoggerFactory loggerFactory
)
{
Expand Down
44 changes: 26 additions & 18 deletions src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisBatchContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Orleans.Streaming.Kinesis
{
[Serializable]
[Orleans.GenerateSerializer]
public class KinesisBatchContainer : IBatchContainer, IComparable<KinesisBatchContainer>
internal class KinesisBatchContainer : IBatchContainer, IComparable<KinesisBatchContainer>
{
[JsonProperty]
[Id(0)]
Expand All @@ -23,20 +23,26 @@ public class KinesisBatchContainer : IBatchContainer, IComparable<KinesisBatchCo

[JsonIgnore]
[field: NonSerialized]
internal Serializer Serializer { get; set; }
internal Serializer<KinesisBatchContainer.Body> Serializer { get; set; }

[JsonProperty]
[Id(1)]
internal KinesisSequenceToken Token { get; }

private KinesisBatchContainer(Record record, Serializer serializer, long sequenceId)
private KinesisBatchContainer(Record record, Serializer<KinesisBatchContainer.Body> serializer, long sequenceId)
{
this.Serializer = serializer;
this._rawRecord = record.Data.ToArray();

Token = new KinesisSequenceToken(record.SequenceNumber, sequenceId, 0);
}

[GeneratedActivatorConstructor]
internal KinesisBatchContainer(Serializer<KinesisBatchContainer.Body> serializer)
{
this.Serializer = serializer;
}

/// <summary>
/// Stream identifier for the stream this batch is part of.
/// </summary>
Expand All @@ -47,7 +53,7 @@ private KinesisBatchContainer(Record record, Serializer serializer, long sequenc
/// </summary>
public StreamSequenceToken SequenceToken => Token;

private Body GetPayload() => _payload ?? (_payload = this.Serializer.Deserialize<Body>(_rawRecord));
private Body GetPayload() => _payload ?? (_payload = this.Serializer.Deserialize(_rawRecord));

/// <summary>
/// Gets events of a specific type from the batch.
Expand Down Expand Up @@ -77,7 +83,21 @@ public bool ImportRequestContext()
public int CompareTo(KinesisBatchContainer other)
=> Token.SequenceNumber.CompareTo(other.SequenceToken.SequenceNumber);

internal static byte[] ToKinesisPayload<T>(Serializer serializer, StreamId streamId, IEnumerable<T> events, Dictionary<string, object> requestContext)
[Serializable]
[GenerateSerializer]
internal class Body
{
[Id(0)]
public List<object> Events { get; set; }

[Id(1)]
public Dictionary<string, object> RequestContext { get; set; }

[Id(2)]
public StreamId StreamId { get; set; }
}

internal static byte[] ToKinesisPayload<T>(Serializer<KinesisBatchContainer.Body> serializer, StreamId streamId, IEnumerable<T> events, Dictionary<string, object> requestContext)
{
var payload = new Body
{
Expand All @@ -89,21 +109,9 @@ internal static byte[] ToKinesisPayload<T>(Serializer serializer, StreamId strea
return serializer.SerializeToArray(payload);
}

internal static KinesisBatchContainer FromKinesisRecord(Serializer serializer, Record record, long sequenceId)
internal static KinesisBatchContainer FromKinesisRecord(Serializer<KinesisBatchContainer.Body> serializer, Record record, long sequenceId)
{
return new KinesisBatchContainer(record, serializer, sequenceId);
}

[Serializable]
[GenerateSerializer]
internal class Body
{
[Id(0)]
public List<object> Events { get; set; }
[Id(1)]
public Dictionary<string, object> RequestContext { get; set; }
[Id(2)]
public StreamId StreamId { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Orleans.Streaming.Kinesis
{
[Serializable]
[GenerateSerializer]
public class KinesisSequenceToken : EventSequenceTokenV2
internal class KinesisSequenceToken : EventSequenceTokenV2
{
/// <summary>
/// Initializes a new instance of the <see cref="KinesisSequenceToken" /> class.
Expand Down

0 comments on commit 5b93e76

Please sign in to comment.