diff --git a/src/AWS/Orleans.Streaming.Kinesis/Hosting/SiloKinesisStreamConfigurator.cs b/src/AWS/Orleans.Streaming.Kinesis/Hosting/SiloKinesisStreamConfigurator.cs index ddb14bf438..2bc9ccf0b1 100644 --- a/src/AWS/Orleans.Streaming.Kinesis/Hosting/SiloKinesisStreamConfigurator.cs +++ b/src/AWS/Orleans.Streaming.Kinesis/Hosting/SiloKinesisStreamConfigurator.cs @@ -33,6 +33,12 @@ public SiloKinesisStreamConfigurator ConfigureKinesis(Action( Func checkpointerFactoryBuilder, Action> configureOptions) diff --git a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs index 53aaaa8c82..5c66811a63 100644 --- a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs +++ b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs @@ -3,6 +3,7 @@ using System.IO; using System.Linq; using System.Threading.Tasks; +using Amazon; using Amazon.Kinesis; using Amazon.Kinesis.Model; using Amazon.Runtime; @@ -19,10 +20,10 @@ namespace Orleans.Streaming.Kinesis /// /// Queue adapter factory which allows the PersistentStreamProvider to use AWS Kinesis Data Streams as its backend persistent event queue. /// - public class KinesisAdapterFactory : IQueueAdapterFactory, IQueueAdapter + internal class KinesisAdapterFactory : IQueueAdapterFactory, IQueueAdapter { private readonly KinesisStreamOptions _options; - private readonly Serializer _serializer; + private readonly Serializer _serializer; private readonly IStreamQueueCheckpointerFactory _checkpointerFactory; private readonly ILoggerFactory _loggerFactory; private readonly IQueueAdapterCache _adapterCache; @@ -36,7 +37,7 @@ public KinesisAdapterFactory( string name, KinesisStreamOptions options, SimpleQueueCacheOptions cacheOptions, - Serializer serializer, + Serializer serializer, IStreamQueueCheckpointerFactory checkpointerFactory, ILoggerFactory loggerFactory ) @@ -69,10 +70,9 @@ public static KinesisAdapterFactory Create(IServiceProvider services, string nam { var streamsConfig = services.GetOptionsByName(name); var cacheOptions = services.GetOptionsByName(name); - var serializer = services.GetRequiredService(); - var logger = services.GetRequiredService(); - var grainFactory = services.GetRequiredService(); + var serializer = services.GetRequiredService>(); var checkpointerFactory = services.GetRequiredKeyedService(name); + var logger = services.GetRequiredService(); var factory = ActivatorUtilities.CreateInstance( services, @@ -81,9 +81,7 @@ public static KinesisAdapterFactory Create(IServiceProvider services, string nam cacheOptions, serializer, checkpointerFactory, - logger, - grainFactory, - services + logger ); return factory; @@ -137,7 +135,7 @@ public IQueueAdapterReceiver CreateReceiver(QueueId queueId) ); } - private AmazonKinesisClient CreateClient() + internal AmazonKinesisClient CreateClient() { if (_options.Service.StartsWith("http://", StringComparison.OrdinalIgnoreCase) || _options.Service.StartsWith("https://", StringComparison.OrdinalIgnoreCase)) @@ -153,16 +151,16 @@ private AmazonKinesisClient CreateClient() { // AWS Kinesis instance (auth via explicit credentials) var credentials = new BasicAWSCredentials(_options.AccessKey, _options.SecretKey); - return new AmazonKinesisClient(credentials, new AmazonKinesisConfig { RegionEndpoint = AWSUtils.GetRegionEndpoint(_options.Service) }); + return new AmazonKinesisClient(credentials, new AmazonKinesisConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(_options.Service) }); } else { // AWS Kinesis instance (implicit auth - EC2 IAM Roles etc) - return new AmazonKinesisClient(new AmazonKinesisConfig { RegionEndpoint = AWSUtils.GetRegionEndpoint(_options.Service) }); + return new AmazonKinesisClient(new AmazonKinesisConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(_options.Service) }); } } - private async Task GetPartitionIdsAsync() + internal async Task GetPartitionIdsAsync() { var request = new ListShardsRequest { diff --git a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterReceiver.cs b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterReceiver.cs index 80fd270389..4ce3654716 100644 --- a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterReceiver.cs +++ b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterReceiver.cs @@ -10,14 +10,14 @@ namespace Orleans.Streaming.Kinesis { - public class KinesisAdapterReceiver : IQueueAdapterReceiver + internal class KinesisAdapterReceiver : IQueueAdapterReceiver { private readonly ILogger _logger; private readonly AmazonKinesisClient _client; private readonly string _streamName; private readonly string _partition; private readonly IStreamQueueCheckpointerFactory _checkpointerFactory; - private readonly Serializer _serializer; + private readonly Serializer _serializer; private IStreamQueueCheckpointer _checkpointer; private string _shardIterator; @@ -28,7 +28,7 @@ internal KinesisAdapterReceiver( string streamName, string partition, IStreamQueueCheckpointerFactory checkpointerFactory, - Serializer serializer, + Serializer serializer, ILoggerFactory loggerFactory ) { diff --git a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisBatchContainer.cs b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisBatchContainer.cs index 2130934d08..86e13327a7 100644 --- a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisBatchContainer.cs +++ b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisBatchContainer.cs @@ -11,7 +11,7 @@ namespace Orleans.Streaming.Kinesis { [Serializable] [Orleans.GenerateSerializer] - public class KinesisBatchContainer : IBatchContainer, IComparable + internal class KinesisBatchContainer : IBatchContainer, IComparable { [JsonProperty] [Id(0)] @@ -23,13 +23,13 @@ public class KinesisBatchContainer : IBatchContainer, IComparable Serializer { get; set; } [JsonProperty] [Id(1)] internal KinesisSequenceToken Token { get; } - private KinesisBatchContainer(Record record, Serializer serializer, long sequenceId) + private KinesisBatchContainer(Record record, Serializer serializer, long sequenceId) { this.Serializer = serializer; this._rawRecord = record.Data.ToArray(); @@ -37,6 +37,12 @@ private KinesisBatchContainer(Record record, Serializer serializer, long sequenc Token = new KinesisSequenceToken(record.SequenceNumber, sequenceId, 0); } + [GeneratedActivatorConstructor] + internal KinesisBatchContainer(Serializer serializer) + { + this.Serializer = serializer; + } + /// /// Stream identifier for the stream this batch is part of. /// @@ -47,7 +53,7 @@ private KinesisBatchContainer(Record record, Serializer serializer, long sequenc /// public StreamSequenceToken SequenceToken => Token; - private Body GetPayload() => _payload ?? (_payload = this.Serializer.Deserialize(_rawRecord)); + private Body GetPayload() => _payload ?? (_payload = this.Serializer.Deserialize(_rawRecord)); /// /// Gets events of a specific type from the batch. @@ -77,7 +83,21 @@ public bool ImportRequestContext() public int CompareTo(KinesisBatchContainer other) => Token.SequenceNumber.CompareTo(other.SequenceToken.SequenceNumber); - internal static byte[] ToKinesisPayload(Serializer serializer, StreamId streamId, IEnumerable events, Dictionary requestContext) + [Serializable] + [GenerateSerializer] + internal class Body + { + [Id(0)] + public List Events { get; set; } + + [Id(1)] + public Dictionary RequestContext { get; set; } + + [Id(2)] + public StreamId StreamId { get; set; } + } + + internal static byte[] ToKinesisPayload(Serializer serializer, StreamId streamId, IEnumerable events, Dictionary requestContext) { var payload = new Body { @@ -89,21 +109,9 @@ internal static byte[] ToKinesisPayload(Serializer serializer, StreamId strea return serializer.SerializeToArray(payload); } - internal static KinesisBatchContainer FromKinesisRecord(Serializer serializer, Record record, long sequenceId) + internal static KinesisBatchContainer FromKinesisRecord(Serializer serializer, Record record, long sequenceId) { return new KinesisBatchContainer(record, serializer, sequenceId); } - - [Serializable] - [GenerateSerializer] - internal class Body - { - [Id(0)] - public List Events { get; set; } - [Id(1)] - public Dictionary RequestContext { get; set; } - [Id(2)] - public StreamId StreamId { get; set; } - } } } diff --git a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisSequenceToken.cs b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisSequenceToken.cs index b618061808..cdfe63b516 100644 --- a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisSequenceToken.cs +++ b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisSequenceToken.cs @@ -7,7 +7,7 @@ namespace Orleans.Streaming.Kinesis { [Serializable] [GenerateSerializer] - public class KinesisSequenceToken : EventSequenceTokenV2 + internal class KinesisSequenceToken : EventSequenceTokenV2 { /// /// Initializes a new instance of the class.