From 5af07a0606de1c6382e65886a24c146269f78c68 Mon Sep 17 00:00:00 2001
From: MASES Public Developers Team
<94312179+masesdevelopers@users.noreply.github.com>
Date: Sat, 21 Oct 2023 03:16:10 +0200
Subject: [PATCH] Updates on documentation (#129)
* Code documentation update
* Changed DbName into DatabaseName to have a common property name, plus fixed UseNameMatching in KafkaDbContext
* Removed other warning messages, plus an unused class
* Added first level cache
---
src/documentation/articles/howitworks.md | 6 +-
src/documentation/articles/kafkadbcontext.md | 3 +-
...fkaCSharpRuntimeAnnotationCodeGenerator.cs | 7 +-
.../Internal/KafkaDesignTimeServices.cs | 4 +-
.../Internal/KafkaLoggerExtensions.cs | 3 -
.../Internal/KafkaLoggingDefinitions.cs | 3 -
src/net/KEFCore/Diagnostics/KafkaEventId.cs | 3 -
.../KafkaDatabaseFacadeExtensions.cs | 3 -
.../KafkaDbContextOptionsExtensions.cs | 21 ++--
.../KafkaEntityTypeBuilderExtensions.cs | 3 -
.../Extensions/KafkaEntityTypeExtensions.cs | 27 +++--
.../KafkaServiceCollectionExtensions.cs | 3 -
...kaDbContextOptionsBuilderInfrastructure.cs | 5 +-
.../Internal/IKafkaSingletonOptions.cs | 43 +++----
.../Internal/KafkaModelValidator.cs | 5 +-
.../Internal/KafkaOptionsExtension.cs | 105 ++++++++++--------
.../Internal/KafkaSingletonOptions.cs | 47 ++++----
.../KEFCore/Infrastructure/KafkaDbContext.cs | 13 ++-
.../KafkaDbContextOptionsBuilder.cs | 4 +-
.../KEFCore/Query/Internal/AnonymousObject.cs | 4 +-
.../Query/Internal/KafkaQueryContext.cs | 13 ++-
.../Internal/KafkaQueryContextFactory.cs | 9 +-
.../Query/Internal/KafkaQueryExpression.cs | 6 +-
...yableMethodTranslatingExpressionVisitor.cs | 84 +++++++-------
...kaShapedQueryCompilingExpressionVisitor.cs | 8 +-
...afkaShapedQueryExpressionVisitorFactory.cs | 5 +-
.../Query/Internal/KafkaTableExpression.cs | 13 ++-
.../Internal/SingleResultShaperExpression.cs | 17 ++-
.../Storage/Internal/EntityTypeProducer.cs | 37 +++---
.../Storage/Internal/EntityTypeProducers.cs | 9 +-
.../Storage/Internal/IEntityTypeProducer.cs | 13 ++-
.../KEFCore/Storage/Internal/IKafkaCluster.cs | 35 ++++--
.../Storage/Internal/IKafkaClusterCache.cs | 7 +-
.../Storage/Internal/IKafkaDatabase.cs | 15 ++-
.../KEFCore/Storage/Internal/IKafkaRowBag.cs | 7 +-
.../KEFCore/Storage/Internal/IKafkaTable.cs | 33 ++++--
.../Storage/Internal/IKafkaTableFactory.cs | 10 +-
.../KEFCore/Storage/Internal/KafkaCluster.cs | 24 ++--
.../Storage/Internal/KafkaClusterCache.cs | 8 +-
.../Internal/KafkaClusterCacheExtensions.cs | 3 +
.../KEFCore/Storage/Internal/KafkaDatabase.cs | 18 +--
.../Storage/Internal/KafkaDatabaseCreator.cs | 21 ++--
.../KEFCore/Storage/Internal/KafkaRowBag.cs | 19 +++-
.../Internal/KafkaStreamsBaseRetriever.cs | 10 +-
.../Internal/KafkaStreamsTableRetriever.cs | 8 +-
.../KEFCore/Storage/Internal/KafkaTable.cs | 30 ++---
.../Storage/Internal/KafkaTableFactory.cs | 8 +-
.../Storage/Internal/KafkaTableSnapshot.cs | 39 -------
.../Storage/Internal/KafkaTransaction.cs | 13 ++-
.../Internal/KafkaTransactionManager.cs | 26 +++--
.../Storage/Internal/KafkaTypeMapping.cs | 5 +-
.../Internal/KafkaTypeMappingSource.cs | 5 +-
.../Internal/IKafkaIntegerValueGenerator.cs | 3 +
.../Internal/KafkaIntegerValueGenerator.cs | 10 +-
.../Internal/KafkaValueGeneratorSelector.cs | 6 +-
.../templates/templates/kefcoreApp/Program.cs | 2 +-
.../templates/kefcoreAppWithEvents/Program.cs | 2 +-
test/KEFCore.Benchmark.Avro.Test/Program.cs | 4 +-
test/KEFCore.Benchmark.Test/Program.cs | 4 +-
test/KEFCore.Complex.Test/Program.cs | 2 +-
test/KEFCore.Test/Program.cs | 2 +-
61 files changed, 515 insertions(+), 390 deletions(-)
delete mode 100644 src/net/KEFCore/Storage/Internal/KafkaTableSnapshot.cs
diff --git a/src/documentation/articles/howitworks.md b/src/documentation/articles/howitworks.md
index 187b7fba..aa58ef12 100644
--- a/src/documentation/articles/howitworks.md
+++ b/src/documentation/articles/howitworks.md
@@ -38,7 +38,11 @@ Using the [topic compaction](https://kafka.apache.org/documentation/#compaction)
- Update: a producer storing a new record with a previously stored unique key will discard the old records
- Delete: a producer storing a new record with a previously stored unique key, and value set to null, will delete all records with that unique key
-All CRUD operations are helped, behind the scene, from [`KNetCompactedReplicator`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs) and/or [`KNetProducer`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Producer/KNetProducer.cs)/[Apache Kafka Streams](https://kafka.apache.org/documentation/streams/).
+All CRUD operations are helped, behind the scene, from [`KNetCompactedReplicator`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs) or [`KNetProducer`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Producer/KNetProducer.cs)/[Apache Kafka Streams](https://kafka.apache.org/documentation/streams/).
+
+### First-level cache
+
+[`KNetCompactedReplicator`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs) or [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/) act as first-level cache of [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/): **data coming from the Apache Kafka cluster updates their content while the system is running without a specific request**.
### Data storage
diff --git a/src/documentation/articles/kafkadbcontext.md b/src/documentation/articles/kafkadbcontext.md
index 3f5dd8ab..d399a4b0 100644
--- a/src/documentation/articles/kafkadbcontext.md
+++ b/src/documentation/articles/kafkadbcontext.md
@@ -6,9 +6,10 @@
- **KeySerializationType**: the .NET type to be used to allocate an external serializer for Apache Kafka record key
- **ValueSerializationType**: the .NET type to be used to allocate an external serializer for Apache Kafka record value
- **ValueContainerType**: the .NET type to be used to allocate an external container class for Apache Kafka record value
+ - **UseNameMatching**: set to **false** to avoid Entity matching based on Name
- **BootstrapServers**: the server hosting the broker of Apache Kafka
- **ApplicationId**: the application identifier used to identify the context
- - **DbName**: the user defined name which declares the database name, it is used to prepend every Topic which belongs to this database
+ - **DatabaseName**: the user defined name which declares the database name, it is used to prepend every Topic which belongs to this database
- **DefaultNumPartitions**: the default number of partitions used when topics are created for each entity
- **DefaultReplicationFactor**: the replication factor to use when data are stored in Apache Kafka
- **DefaultConsumerInstances**: the consumer instances to be allocated when UseCompactedReplicator is **true**
diff --git a/src/net/KEFCore/Design/Internal/KafkaCSharpRuntimeAnnotationCodeGenerator.cs b/src/net/KEFCore/Design/Internal/KafkaCSharpRuntimeAnnotationCodeGenerator.cs
index cfe7625a..f66804f8 100644
--- a/src/net/KEFCore/Design/Internal/KafkaCSharpRuntimeAnnotationCodeGenerator.cs
+++ b/src/net/KEFCore/Design/Internal/KafkaCSharpRuntimeAnnotationCodeGenerator.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
@@ -32,6 +29,10 @@ namespace MASES.EntityFrameworkCore.KNet.Design.Internal;
///
public class KafkaCSharpRuntimeAnnotationCodeGenerator : CSharpRuntimeAnnotationCodeGenerator
{
+ ///
+ /// Default initializer
+ ///
+ ///
public KafkaCSharpRuntimeAnnotationCodeGenerator(
CSharpRuntimeAnnotationCodeGeneratorDependencies dependencies)
: base(dependencies)
diff --git a/src/net/KEFCore/Design/Internal/KafkaDesignTimeServices.cs b/src/net/KEFCore/Design/Internal/KafkaDesignTimeServices.cs
index a60384b1..b2f01b24 100644
--- a/src/net/KEFCore/Design/Internal/KafkaDesignTimeServices.cs
+++ b/src/net/KEFCore/Design/Internal/KafkaDesignTimeServices.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
@@ -32,6 +29,7 @@ namespace MASES.EntityFrameworkCore.KNet.Design.Internal;
///
public class KafkaDesignTimeServices : IDesignTimeServices
{
+ ///
public virtual void ConfigureDesignTimeServices(IServiceCollection serviceCollection)
{
serviceCollection.AddEntityFrameworkKafkaDatabase();
diff --git a/src/net/KEFCore/Diagnostics/Internal/KafkaLoggerExtensions.cs b/src/net/KEFCore/Diagnostics/Internal/KafkaLoggerExtensions.cs
index c27cbeb6..329e7076 100644
--- a/src/net/KEFCore/Diagnostics/Internal/KafkaLoggerExtensions.cs
+++ b/src/net/KEFCore/Diagnostics/Internal/KafkaLoggerExtensions.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
diff --git a/src/net/KEFCore/Diagnostics/Internal/KafkaLoggingDefinitions.cs b/src/net/KEFCore/Diagnostics/Internal/KafkaLoggingDefinitions.cs
index 0062a21d..3e4e541d 100644
--- a/src/net/KEFCore/Diagnostics/Internal/KafkaLoggingDefinitions.cs
+++ b/src/net/KEFCore/Diagnostics/Internal/KafkaLoggingDefinitions.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
diff --git a/src/net/KEFCore/Diagnostics/KafkaEventId.cs b/src/net/KEFCore/Diagnostics/KafkaEventId.cs
index 09675ee4..8cc86b61 100644
--- a/src/net/KEFCore/Diagnostics/KafkaEventId.cs
+++ b/src/net/KEFCore/Diagnostics/KafkaEventId.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
diff --git a/src/net/KEFCore/Extensions/KafkaDatabaseFacadeExtensions.cs b/src/net/KEFCore/Extensions/KafkaDatabaseFacadeExtensions.cs
index 1a2b08b1..8251b436 100644
--- a/src/net/KEFCore/Extensions/KafkaDatabaseFacadeExtensions.cs
+++ b/src/net/KEFCore/Extensions/KafkaDatabaseFacadeExtensions.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
diff --git a/src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs b/src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs
index 866c9605..85f96e7a 100644
--- a/src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs
+++ b/src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
@@ -125,22 +122,28 @@ var coreOptionsExtension
((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(coreOptionsExtension);
}
-
+ ///
+ /// Creates a serializer for keys
+ ///
public static Type SerializerTypeForKey(this IKafkaSingletonOptions options, IEntityType entityType)
{
var primaryKey = entityType.FindPrimaryKey()!.GetKeyType();
- return options.KeySerializationType.MakeGenericType(primaryKey);
+ return options.KeySerializationType?.MakeGenericType(primaryKey)!;
}
-
+ ///
+ /// Creates a serialzier for values
+ ///
public static Type SerializerTypeForValue(this IKafkaSingletonOptions options, IEntityType entityType)
{
var primaryKey = entityType.FindPrimaryKey()!.GetKeyType();
- return options.ValueSerializationType.MakeGenericType(ValueContainerType(options, entityType));
+ return options.ValueSerializationType?.MakeGenericType(ValueContainerType(options, entityType))!;
}
-
+ ///
+ /// Create the ValueContainer
+ ///
public static Type ValueContainerType(this IKafkaSingletonOptions options, IEntityType entityType)
{
var primaryKey = entityType.FindPrimaryKey()!.GetKeyType();
- return options.ValueContainerType.MakeGenericType(primaryKey);
+ return options.ValueContainerType?.MakeGenericType(primaryKey)!;
}
}
diff --git a/src/net/KEFCore/Extensions/KafkaEntityTypeBuilderExtensions.cs b/src/net/KEFCore/Extensions/KafkaEntityTypeBuilderExtensions.cs
index ba28565f..b0a6e462 100644
--- a/src/net/KEFCore/Extensions/KafkaEntityTypeBuilderExtensions.cs
+++ b/src/net/KEFCore/Extensions/KafkaEntityTypeBuilderExtensions.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
diff --git a/src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs b/src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs
index 9a158759..fdee1d9f 100644
--- a/src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs
+++ b/src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
@@ -90,34 +87,46 @@ public static void SetKafkaQuery(
=> entityType.FindAnnotation(CoreAnnotationNames.DefiningQuery)?.GetConfigurationSource();
#pragma warning restore CS0612 // Il tipo o il membro è obsoleto
#pragma warning restore EF1001 // Internal EF Core API usage.
-
+ ///
+ /// Creates the topic name
+ ///
public static string TopicName(this IEntityType entityType, KafkaOptionsExtension options)
{
return $"{options.DatabaseName}.{entityType.Name}";
}
-
+ ///
+ /// Creates the storage id
+ ///
public static string StorageIdForTable(this IEntityType entityType, KafkaOptionsExtension options)
{
return $"Table_{entityType.TopicName(options)}";
}
-
+ ///
+ /// Creates the application id
+ ///
public static string ApplicationIdForTable(this IEntityType entityType, KafkaOptionsExtension options)
{
return $"{options.ApplicationId}_{entityType.Name}";
}
-
+ ///
+ /// Gets replication factor
+ ///
public static short ReplicationFactor(this IEntityType entityType, KafkaOptionsExtension options)
{
var replicationFactor = options.DefaultReplicationFactor;
return replicationFactor;
}
-
+ ///
+ /// Gets number of partitions
+ ///
public static int NumPartitions(this IEntityType entityType, KafkaOptionsExtension options)
{
var numPartitions = options.DefaultNumPartitions;
return numPartitions;
}
-
+ ///
+ /// Gets consumer instances
+ ///
public static int? ConsumerInstances(this IEntityType entityType, KafkaOptionsExtension options)
{
var consumerInstances = options.DefaultConsumerInstances;
diff --git a/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs b/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs
index ef69994f..68aed60e 100644
--- a/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs
+++ b/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
diff --git a/src/net/KEFCore/Infrastructure/IKafkaDbContextOptionsBuilderInfrastructure.cs b/src/net/KEFCore/Infrastructure/IKafkaDbContextOptionsBuilderInfrastructure.cs
index e1052d33..94d9cb54 100644
--- a/src/net/KEFCore/Infrastructure/IKafkaDbContextOptionsBuilderInfrastructure.cs
+++ b/src/net/KEFCore/Infrastructure/IKafkaDbContextOptionsBuilderInfrastructure.cs
@@ -1,7 +1,4 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-/*
+/*
* Copyright 2023 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs
index ab6aecae..058967c3 100644
--- a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs
@@ -32,39 +32,40 @@ namespace MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
///
public interface IKafkaSingletonOptions : ISingletonOptions
{
- Type KeySerializationType { get; }
-
- Type ValueSerializationType { get; }
-
- Type ValueContainerType { get; }
-
+ ///
+ Type? KeySerializationType { get; }
+ ///
+ Type? ValueSerializationType { get; }
+ ///
+ Type? ValueContainerType { get; }
+ ///
bool UseNameMatching { get; }
-
+ ///
string? DatabaseName { get; }
-
+ ///
string? ApplicationId { get; }
-
+ ///
string? BootstrapServers { get; }
-
+ ///
bool UseDeletePolicyForTopic { get; }
-
+ ///
bool UseCompactedReplicator { get; }
-
+ ///
bool UsePersistentStorage { get; }
-
+ ///
int DefaultNumPartitions { get; }
-
+ ///
int? DefaultConsumerInstances { get; }
-
+ ///
int DefaultReplicationFactor { get; }
-
+ ///
ConsumerConfigBuilder? ConsumerConfig { get; }
-
+ ///
ProducerConfigBuilder? ProducerConfig { get; }
-
+ ///
StreamsConfigBuilder? StreamsConfig { get; }
-
+ ///
TopicConfigBuilder? TopicConfig { get; }
-
- Action OnChangeEvent { get; }
+ ///
+ Action? OnChangeEvent { get; }
}
diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaModelValidator.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaModelValidator.cs
index 68648e8b..87df64c2 100644
--- a/src/net/KEFCore/Infrastructure/Internal/KafkaModelValidator.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/KafkaModelValidator.cs
@@ -25,11 +25,14 @@ namespace MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
///
public class KafkaModelValidator : ModelValidator
{
+ ///
+ /// Initializer
+ ///
public KafkaModelValidator(ModelValidatorDependencies dependencies)
: base(dependencies)
{
}
-
+ ///
public override void Validate(IModel model, IDiagnosticsLogger logger)
{
base.Validate(model, logger);
diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
index f3dd184c..83e45595 100644
--- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
@@ -25,7 +25,6 @@
using MASES.KNet.Common;
using MASES.KNet.Consumer;
using MASES.KNet.Producer;
-using MASES.KNet.Serialization;
using MASES.KNet.Streams;
using System.Globalization;
@@ -60,11 +59,15 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension
static Java.Lang.ClassLoader _loader = Java.Lang.ClassLoader.SystemClassLoader;
static Java.Lang.ClassLoader SystemClassLoader => _loader;
-
+ ///
+ /// Initializer
+ ///
public KafkaOptionsExtension()
{
}
-
+ ///
+ /// Initializer
+ ///
protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
{
_keySerializationType = copyFrom._keySerializationType;
@@ -86,49 +89,51 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
_topicConfigBuilder = TopicConfigBuilder.CreateFrom(copyFrom._topicConfigBuilder);
_onChangeEvent = copyFrom._onChangeEvent;
}
-
+ ///
public virtual DbContextOptionsExtensionInfo Info => _info ??= new ExtensionInfo(this);
-
+ ///
protected virtual KafkaOptionsExtension Clone() => new(this);
-
+ ///
+ /// Internal property
+ ///
public virtual string ClusterId => _bootstrapServers!;
-
+ ///
public virtual Type KeySerializationType => _keySerializationType;
-
+ ///
public virtual Type ValueSerializationType => _valueSerializationType;
-
+ ///
public virtual Type ValueContainerType => _valueContainerType;
-
+ ///
public virtual bool UseNameMatching => _useNameMatching;
-
+ ///
public virtual string DatabaseName => _databaseName!;
-
+ ///
public virtual string ApplicationId => _applicationId!;
-
+ ///
public virtual string BootstrapServers => _bootstrapServers!;
-
+ ///
public virtual bool UseDeletePolicyForTopic => _useDeletePolicyForTopic;
-
+ ///
public virtual bool UseCompactedReplicator => _useCompactedReplicator;
-
+ ///
public virtual bool UsePersistentStorage => _usePersistentStorage;
-
+ ///
public virtual int DefaultNumPartitions => _defaultNumPartitions;
-
+ ///
public virtual int? DefaultConsumerInstances => _defaultConsumerInstances;
-
+ ///
public virtual short DefaultReplicationFactor => _defaultReplicationFactor;
-
+ ///
public virtual ConsumerConfigBuilder ConsumerConfig => _consumerConfigBuilder!;
-
+ ///
public virtual ProducerConfigBuilder ProducerConfig => _producerConfigBuilder!;
-
+ ///
public virtual StreamsConfigBuilder StreamsConfig => _streamsConfigBuilder!;
-
+ ///
public virtual TopicConfigBuilder TopicConfig => _topicConfigBuilder!;
-
+ ///
public virtual Action OnChangeEvent => _onChangeEvent!;
-
+ ///
public virtual KafkaOptionsExtension WithKeySerializationType(Type serializationType)
{
if (!serializationType.IsGenericTypeDefinition) throw new InvalidOperationException($"{serializationType.Name} shall be a generic type and shall be defined using \"<>\"");
@@ -139,7 +144,7 @@ public virtual KafkaOptionsExtension WithKeySerializationType(Type serialization
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithValueSerializationType(Type serializationType)
{
if (!serializationType.IsGenericTypeDefinition) throw new InvalidOperationException($"{serializationType.Name} shall be a generic type and shall be defined using \"<>\"");
@@ -150,7 +155,7 @@ public virtual KafkaOptionsExtension WithValueSerializationType(Type serializati
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithValueContainerType(Type serializationType)
{
if (!serializationType.IsGenericTypeDefinition) throw new InvalidOperationException($"{serializationType.Name} shall be a generic type and shall be defined using \"<>\"");
@@ -161,7 +166,7 @@ public virtual KafkaOptionsExtension WithValueContainerType(Type serializationTy
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithUseNameMatching(bool useNameMatching = true)
{
var clone = Clone();
@@ -170,7 +175,7 @@ public virtual KafkaOptionsExtension WithUseNameMatching(bool useNameMatching =
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithDatabaseName(string databaseName)
{
var clone = Clone();
@@ -179,7 +184,7 @@ public virtual KafkaOptionsExtension WithDatabaseName(string databaseName)
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithApplicationId(string applicationId)
{
var clone = Clone();
@@ -188,7 +193,7 @@ public virtual KafkaOptionsExtension WithApplicationId(string applicationId)
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithBootstrapServers(string bootstrapServers)
{
var clone = Clone();
@@ -197,7 +202,7 @@ public virtual KafkaOptionsExtension WithBootstrapServers(string bootstrapServer
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithUseDeletePolicyForTopic(bool useDeletePolicyForTopic = false)
{
var clone = Clone();
@@ -206,7 +211,7 @@ public virtual KafkaOptionsExtension WithUseDeletePolicyForTopic(bool useDeleteP
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithCompactedReplicator(bool useCompactedReplicator = true)
{
var clone = Clone();
@@ -215,7 +220,7 @@ public virtual KafkaOptionsExtension WithCompactedReplicator(bool useCompactedRe
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithUsePersistentStorage(bool usePersistentStorage = false)
{
var clone = Clone();
@@ -224,7 +229,7 @@ public virtual KafkaOptionsExtension WithUsePersistentStorage(bool usePersistent
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithDefaultNumPartitions(int defaultNumPartitions = 1)
{
var clone = Clone();
@@ -233,7 +238,7 @@ public virtual KafkaOptionsExtension WithDefaultNumPartitions(int defaultNumPart
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithDefaultConsumerInstances(int? defaultConsumerInstances = null)
{
var clone = Clone();
@@ -242,7 +247,7 @@ public virtual KafkaOptionsExtension WithDefaultConsumerInstances(int? defaultCo
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithDefaultReplicationFactor(short defaultReplicationFactor = 1)
{
var clone = Clone();
@@ -251,7 +256,7 @@ public virtual KafkaOptionsExtension WithDefaultReplicationFactor(short defaultR
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder)
{
var clone = Clone();
@@ -260,7 +265,7 @@ public virtual KafkaOptionsExtension WithConsumerConfig(ConsumerConfigBuilder co
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithProducerConfig(ProducerConfigBuilder producerConfigBuilder)
{
var clone = Clone();
@@ -269,7 +274,7 @@ public virtual KafkaOptionsExtension WithProducerConfig(ProducerConfigBuilder pr
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithStreamsConfig(StreamsConfigBuilder streamsConfigBuilder)
{
var clone = Clone();
@@ -278,7 +283,7 @@ public virtual KafkaOptionsExtension WithStreamsConfig(StreamsConfigBuilder stre
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithTopicConfig(TopicConfigBuilder topicConfigBuilder)
{
var clone = Clone();
@@ -287,7 +292,7 @@ public virtual KafkaOptionsExtension WithTopicConfig(TopicConfigBuilder topicCon
return clone;
}
-
+ ///
public virtual KafkaOptionsExtension WithOnChangeEvent(Action onChangeEvent)
{
var clone = Clone();
@@ -296,12 +301,16 @@ public virtual KafkaOptionsExtension WithOnChangeEvent(Action
+ /// Build for
+ ///
public virtual Properties StreamsOptions(IEntityType entityType)
{
return StreamsOptions(entityType.ApplicationIdForTable(this));
}
-
+ ///
+ /// Build for applicationId
+ ///
public virtual Properties StreamsOptions(string applicationId)
{
Properties props = _streamsConfigBuilder ?? new();
@@ -333,7 +342,9 @@ public virtual Properties StreamsOptions(string applicationId)
return props;
}
-
+ ///
+ /// Build for producers
+ ///
public virtual Properties ProducerOptions()
{
Properties props = _producerConfigBuilder ?? new();
@@ -367,9 +378,9 @@ public virtual Properties ProducerOptions()
return props;
}
-
+ ///
public virtual void ApplyServices(IServiceCollection services) => services.AddEntityFrameworkKafkaDatabase();
-
+ ///
public virtual void Validate(IDbContextOptions options)
{
var kafkaOptions = options.FindExtension();
diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
index 48cf7439..26427b91 100644
--- a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
@@ -30,6 +30,7 @@ namespace MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
///
public class KafkaSingletonOptions : IKafkaSingletonOptions
{
+ ///
public virtual void Initialize(IDbContextOptions options)
{
var kafkaOptions = options.FindExtension();
@@ -56,7 +57,7 @@ public virtual void Initialize(IDbContextOptions options)
OnChangeEvent = kafkaOptions.OnChangeEvent;
}
}
-
+ ///
public virtual void Validate(IDbContextOptions options)
{
var kafkaOptions = options.FindExtension();
@@ -70,40 +71,40 @@ public virtual void Validate(IDbContextOptions options)
nameof(DbContextOptionsBuilder.UseInternalServiceProvider)));
}
}
-
- public virtual Type KeySerializationType { get; private set; }
-
- public virtual Type ValueSerializationType { get; private set; }
-
- public virtual Type ValueContainerType { get; private set; }
-
+ ///
+ public virtual Type? KeySerializationType { get; private set; }
+ ///
+ public virtual Type? ValueSerializationType { get; private set; }
+ ///
+ public virtual Type? ValueContainerType { get; private set; }
+ ///
public virtual bool UseNameMatching { get; private set; }
-
+ ///
public virtual string? DatabaseName { get; private set; }
-
+ ///
public virtual string? ApplicationId { get; private set; }
-
+ ///
public virtual string? BootstrapServers { get; private set; }
-
+ ///
public virtual bool UseDeletePolicyForTopic { get; private set; }
-
+ ///
public virtual bool UseCompactedReplicator { get; private set; }
-
+ ///
public virtual bool UsePersistentStorage { get; private set; }
-
+ ///
public virtual int DefaultNumPartitions { get; private set; }
-
+ ///
public virtual int? DefaultConsumerInstances { get; private set; }
-
+ ///
public virtual int DefaultReplicationFactor { get; private set; }
-
+ ///
public virtual ConsumerConfigBuilder? ConsumerConfig { get; private set; }
-
+ ///
public virtual ProducerConfigBuilder? ProducerConfig { get; private set; }
-
+ ///
public virtual StreamsConfigBuilder? StreamsConfig { get; private set; }
-
+ ///
public virtual TopicConfigBuilder? TopicConfig { get; private set; }
-
- public virtual Action OnChangeEvent { get; private set; }
+ ///
+ public virtual Action? OnChangeEvent { get; private set; }
}
diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs
index 18626fb8..b5d488e5 100644
--- a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs
+++ b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs
@@ -168,6 +168,10 @@ public KafkaDbContext(DbContextOptions options) : base(options)
///
public virtual Type? ValueContainerType { get; set; } = null;
///
+ /// Set to to avoid match of s using
+ ///
+ public virtual bool UseNameMatching { get; set; } = true;
+ ///
/// The bootstrap servers of the Apache Kafka cluster
///
public virtual string? BootstrapServers { get; set; }
@@ -176,9 +180,9 @@ public KafkaDbContext(DbContextOptions options) : base(options)
///
public virtual string ApplicationId { get; set; } = Guid.NewGuid().ToString();
///
- /// Database name
+ /// Database name means whe prefix of the topics associated to the instance of
///
- public virtual string? DbName { get; set; }
+ public virtual string? DatabaseName { get; set; }
///
/// Default number of partitions associated to each topic
///
@@ -229,10 +233,11 @@ public KafkaDbContext(DbContextOptions options) : base(options)
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
if (BootstrapServers == null) throw new ArgumentNullException(nameof(BootstrapServers));
- if (DbName == null) throw new ArgumentNullException(nameof(DbName));
+ if (DatabaseName == null) throw new ArgumentNullException(nameof(DatabaseName));
- optionsBuilder.UseKafkaCluster(ApplicationId, DbName, BootstrapServers, (o) =>
+ optionsBuilder.UseKafkaCluster(ApplicationId, DatabaseName, BootstrapServers, (o) =>
{
+ o.WithUseNameMatching(UseNameMatching);
o.WithConsumerConfig(ConsumerConfig ?? DefaultConsumerConfig);
o.WithProducerConfig(ProducerConfig ?? DefaultProducerConfig);
o.WithStreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions);
diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
index 934895b0..8fe2704b 100644
--- a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
+++ b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
@@ -26,6 +23,7 @@
using MASES.KNet.Replicator;
using MASES.KNet.Streams;
using Org.Apache.Kafka.Clients.Producer;
+using Org.Apache.Kafka.Common.Config;
using Org.Apache.Kafka.Streams;
using Org.Apache.Kafka.Streams.Kstream;
using System.ComponentModel;
diff --git a/src/net/KEFCore/Query/Internal/AnonymousObject.cs b/src/net/KEFCore/Query/Internal/AnonymousObject.cs
index 7fff5f8b..77255dfa 100644
--- a/src/net/KEFCore/Query/Internal/AnonymousObject.cs
+++ b/src/net/KEFCore/Query/Internal/AnonymousObject.cs
@@ -45,11 +45,11 @@ public AnonymousObject(object[] values)
public static bool operator !=(AnonymousObject x, AnonymousObject y)
=> !x.Equals(y);
-
+ ///
public override bool Equals(object? obj)
=> obj is not null && (obj is AnonymousObject anonymousObject
&& _values.SequenceEqual(anonymousObject._values));
-
+ ///
public override int GetHashCode()
{
var hash = new HashCode();
diff --git a/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs b/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs
index 669a5a46..80d7ddb5 100644
--- a/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs
+++ b/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
@@ -32,12 +29,18 @@ namespace MASES.EntityFrameworkCore.KNet.Query.Internal;
public class KafkaQueryContext : QueryContext
{
private readonly IKafkaCluster _cluster;
-
+ ///
+ /// Retrieve for the specified
+ ///
+ ///
+ ///
public virtual IEnumerable GetValueBuffers(IEntityType entityType)
{
return _cluster.GetValueBuffers(entityType);
}
-
+ ///
+ /// Default initializer
+ ///
public KafkaQueryContext(QueryContextDependencies dependencies, IKafkaCluster cluster)
: base(dependencies)
{
diff --git a/src/net/KEFCore/Query/Internal/KafkaQueryContextFactory.cs b/src/net/KEFCore/Query/Internal/KafkaQueryContextFactory.cs
index 3adfba7a..20ce8d0b 100644
--- a/src/net/KEFCore/Query/Internal/KafkaQueryContextFactory.cs
+++ b/src/net/KEFCore/Query/Internal/KafkaQueryContextFactory.cs
@@ -1,6 +1,3 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
/*
* Copyright 2023 MASES s.r.l.
*
@@ -31,7 +28,9 @@ namespace MASES.EntityFrameworkCore.KNet.Query.Internal;
public class KafkaQueryContextFactory : IQueryContextFactory
{
private readonly IKafkaCluster _cluster;
-
+ ///
+ /// Default initializer
+ ///
public KafkaQueryContextFactory(
QueryContextDependencies dependencies,
IKafkaClusterCache clusterCache,
@@ -45,6 +44,6 @@ public KafkaQueryContextFactory(
/// Dependencies for this service.
///
protected virtual QueryContextDependencies Dependencies { get; }
-
+ ///
public virtual QueryContext Create() => new KafkaQueryContext(Dependencies, _cluster);
}
diff --git a/src/net/KEFCore/Query/Internal/KafkaQueryExpression.cs b/src/net/KEFCore/Query/Internal/KafkaQueryExpression.cs
index 8ff2b23d..a311aa9e 100644
--- a/src/net/KEFCore/Query/Internal/KafkaQueryExpression.cs
+++ b/src/net/KEFCore/Query/Internal/KafkaQueryExpression.cs
@@ -639,7 +639,7 @@ public virtual EntityShaperExpression AddNavigationToWeakEntityType(
return entityShaper;
}
-
+ ///
public virtual ShapedQueryExpression Clone(Expression shaperExpression)
{
var clonedKafkaQueryExpression = Clone();
@@ -667,10 +667,10 @@ public virtual Expression GetSingleScalarProjection()
public virtual void ConvertToSingleResult(MethodInfo methodInfo)
=> _singleResultMethodInfo = methodInfo;
-
+ ///
public override Type Type
=> typeof(IEnumerable);
-
+ ///
public sealed override ExpressionType NodeType
=> ExpressionType.Extension;
diff --git a/src/net/KEFCore/Query/Internal/KafkaQueryableMethodTranslatingExpressionVisitor.cs b/src/net/KEFCore/Query/Internal/KafkaQueryableMethodTranslatingExpressionVisitor.cs
index aa78f990..634a3aba 100644
--- a/src/net/KEFCore/Query/Internal/KafkaQueryableMethodTranslatingExpressionVisitor.cs
+++ b/src/net/KEFCore/Query/Internal/KafkaQueryableMethodTranslatingExpressionVisitor.cs
@@ -32,7 +32,7 @@ public class KafkaQueryableMethodTranslatingExpressionVisitor : QueryableMethodT
private readonly SharedTypeEntityExpandingExpressionVisitor _weakEntityExpandingExpressionVisitor;
private readonly KafkaProjectionBindingExpressionVisitor _projectionBindingExpressionVisitor;
private readonly IModel _model;
-
+ ///
public KafkaQueryableMethodTranslatingExpressionVisitor(
QueryableMethodTranslatingExpressionVisitorDependencies dependencies,
QueryCompilationContext queryCompilationContext)
@@ -43,7 +43,7 @@ public KafkaQueryableMethodTranslatingExpressionVisitor(
_projectionBindingExpressionVisitor = new KafkaProjectionBindingExpressionVisitor(this, _expressionTranslator);
_model = queryCompilationContext.Model;
}
-
+ ///
protected KafkaQueryableMethodTranslatingExpressionVisitor(
KafkaQueryableMethodTranslatingExpressionVisitor parentVisitor)
: base(parentVisitor.Dependencies, parentVisitor.QueryCompilationContext, subquery: true)
@@ -53,10 +53,10 @@ protected KafkaQueryableMethodTranslatingExpressionVisitor(
_projectionBindingExpressionVisitor = new KafkaProjectionBindingExpressionVisitor(this, _expressionTranslator);
_model = parentVisitor._model;
}
-
+ ///
protected override QueryableMethodTranslatingExpressionVisitor CreateSubqueryVisitor()
=> new KafkaQueryableMethodTranslatingExpressionVisitor(this);
-
+ ///
protected override Expression VisitExtension(Expression extensionExpression)
{
switch (extensionExpression)
@@ -75,7 +75,7 @@ protected override Expression VisitExtension(Expression extensionExpression)
return base.VisitExtension(extensionExpression);
}
}
-
+ ///
protected override Expression VisitMethodCall(MethodCallExpression methodCallExpression)
{
if (methodCallExpression.Method.IsGenericMethod
@@ -90,11 +90,13 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp
return base.VisitMethodCall(methodCallExpression);
}
#if NET6_0
+ ///
protected override ShapedQueryExpression CreateShapedQueryExpression(Type elementType)
{
throw new NotImplementedException();
}
#endif
+ ///
protected override ShapedQueryExpression CreateShapedQueryExpression(IEntityType entityType)
=> CreateShapedQueryExpressionStatic(entityType);
@@ -112,7 +114,7 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy
typeof(ValueBuffer)),
false));
}
-
+ ///
protected override ShapedQueryExpression? TranslateAll(ShapedQueryExpression source, LambdaExpression predicate)
{
predicate = Expression.Lambda(Expression.Not(predicate.Body), predicate.Parameters);
@@ -139,7 +141,7 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy
return source.UpdateShaperExpression(Expression.Convert(kafkaQueryExpression.GetSingleScalarProjection(), typeof(bool)));
}
-
+ ///
protected override ShapedQueryExpression? TranslateAny(ShapedQueryExpression source, LambdaExpression? predicate)
{
if (predicate != null)
@@ -167,21 +169,21 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy
return source.UpdateShaperExpression(Expression.Convert(kafkaQueryExpression.GetSingleScalarProjection(), typeof(bool)));
}
-
+ ///
protected override ShapedQueryExpression? TranslateAverage(
ShapedQueryExpression source,
LambdaExpression? selector,
Type resultType)
=> TranslateScalarAggregate(source, selector, nameof(Enumerable.Average), resultType);
-
+ ///
protected override ShapedQueryExpression? TranslateCast(ShapedQueryExpression source, Type resultType)
=> source.ShaperExpression.Type != resultType
? source.UpdateShaperExpression(Expression.Convert(source.ShaperExpression, resultType))
: source;
-
+ ///
protected override ShapedQueryExpression? TranslateConcat(ShapedQueryExpression source1, ShapedQueryExpression source2)
=> TranslateSetOperation(EnumerableMethods.Concat, source1, source2);
-
+ ///
protected override ShapedQueryExpression? TranslateContains(ShapedQueryExpression source, Expression item)
{
var kafkaQueryExpression = (KafkaQueryExpression)source.QueryExpression;
@@ -207,7 +209,7 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy
return source.UpdateShaperExpression(Expression.Convert(kafkaQueryExpression.GetSingleScalarProjection(), typeof(bool)));
}
-
+ ///
protected override ShapedQueryExpression? TranslateCount(ShapedQueryExpression source, LambdaExpression? predicate)
{
if (predicate != null)
@@ -235,7 +237,7 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy
return source.UpdateShaperExpression(Expression.Convert(kafkaQueryExpression.GetSingleScalarProjection(), typeof(int)));
}
-
+ ///
protected override ShapedQueryExpression? TranslateDefaultIfEmpty(ShapedQueryExpression source, Expression? defaultValue)
{
if (defaultValue == null)
@@ -246,23 +248,23 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy
return null;
}
-
+ ///
protected override ShapedQueryExpression? TranslateDistinct(ShapedQueryExpression source)
{
((KafkaQueryExpression)source.QueryExpression).ApplyDistinct();
return source;
}
-
+ ///
protected override ShapedQueryExpression? TranslateElementAtOrDefault(
ShapedQueryExpression source,
Expression index,
bool returnDefault)
=> null;
-
+ ///
protected override ShapedQueryExpression? TranslateExcept(ShapedQueryExpression source1, ShapedQueryExpression source2)
=> TranslateSetOperation(EnumerableMethods.Except, source1, source2);
-
+ ///
protected override ShapedQueryExpression? TranslateFirstOrDefault(
ShapedQueryExpression source,
LambdaExpression? predicate,
@@ -276,7 +278,7 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy
? EnumerableMethods.FirstOrDefaultWithoutPredicate
: EnumerableMethods.FirstWithoutPredicate);
-
+ ///
protected override ShapedQueryExpression? TranslateGroupBy(
ShapedQueryExpression source,
LambdaExpression keySelector,
@@ -376,7 +378,7 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy
: Expression.Convert(translation, expression.Type);
}
}
-
+ ///
protected override ShapedQueryExpression? TranslateGroupJoin(
ShapedQueryExpression outer,
ShapedQueryExpression inner,
@@ -384,10 +386,10 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy
LambdaExpression innerKeySelector,
LambdaExpression resultSelector)
=> null;
-
+ ///
protected override ShapedQueryExpression? TranslateIntersect(ShapedQueryExpression source1, ShapedQueryExpression source2)
=> TranslateSetOperation(EnumerableMethods.Intersect, source1, source2);
-
+ ///
protected override ShapedQueryExpression? TranslateJoin(
ShapedQueryExpression outer,
ShapedQueryExpression inner,
@@ -523,7 +525,7 @@ static bool IsConvertedToNullable(Expression outer, Expression inner)
&& !inner.Type.IsNullableType()
&& outer.Type.UnwrapNullableType() == inner.Type;
}
-
+ ///
protected override ShapedQueryExpression? TranslateLastOrDefault(
ShapedQueryExpression source,
LambdaExpression? predicate,
@@ -536,7 +538,7 @@ static bool IsConvertedToNullable(Expression outer, Expression inner)
returnDefault
? EnumerableMethods.LastOrDefaultWithoutPredicate
: EnumerableMethods.LastWithoutPredicate);
-
+ ///
protected override ShapedQueryExpression? TranslateLeftJoin(
ShapedQueryExpression outer,
ShapedQueryExpression inner,
@@ -565,7 +567,7 @@ static bool IsConvertedToNullable(Expression outer, Expression inner)
return TranslateTwoParameterSelector(outer, resultSelector);
}
-
+ ///
protected override ShapedQueryExpression? TranslateLongCount(ShapedQueryExpression source, LambdaExpression? predicate)
{
if (predicate != null)
@@ -594,16 +596,16 @@ static bool IsConvertedToNullable(Expression outer, Expression inner)
return source.UpdateShaperExpression(Expression.Convert(kafkaQueryExpression.GetSingleScalarProjection(), typeof(long)));
}
-
+ ///
protected override ShapedQueryExpression? TranslateMax(
ShapedQueryExpression source,
LambdaExpression? selector,
Type resultType)
=> TranslateScalarAggregate(source, selector, nameof(Enumerable.Max), resultType);
-
+ ///
protected override ShapedQueryExpression? TranslateMin(ShapedQueryExpression source, LambdaExpression? selector, Type resultType)
=> TranslateScalarAggregate(source, selector, nameof(Enumerable.Min), resultType);
-
+ ///
protected override ShapedQueryExpression? TranslateOfType(ShapedQueryExpression source, Type resultType)
{
if (source.ShaperExpression is EntityShaperExpression entityShaperExpression)
@@ -651,7 +653,7 @@ static bool IsConvertedToNullable(Expression outer, Expression inner)
return null;
}
-
+ ///
protected override ShapedQueryExpression? TranslateOrderBy(
ShapedQueryExpression source,
LambdaExpression keySelector,
@@ -676,7 +678,7 @@ static bool IsConvertedToNullable(Expression outer, Expression inner)
return source;
}
-
+ ///
protected override ShapedQueryExpression? TranslateReverse(ShapedQueryExpression source)
{
var kafkaQueryExpression = (KafkaQueryExpression)source.QueryExpression;
@@ -688,7 +690,7 @@ static bool IsConvertedToNullable(Expression outer, Expression inner)
return source;
}
-
+ ///
protected override ShapedQueryExpression TranslateSelect(ShapedQueryExpression source, LambdaExpression selector)
{
if (selector.Body == selector.Parameters[0])
@@ -702,7 +704,7 @@ protected override ShapedQueryExpression TranslateSelect(ShapedQueryExpression s
return source.UpdateShaperExpression(newShaper);
}
-
+ ///
protected override ShapedQueryExpression? TranslateSelectMany(
ShapedQueryExpression source,
LambdaExpression collectionSelector,
@@ -748,7 +750,7 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp
return base.VisitMethodCall(methodCallExpression);
}
}
-
+ ///
protected override ShapedQueryExpression? TranslateSelectMany(ShapedQueryExpression source, LambdaExpression selector)
{
var innerParameter = Expression.Parameter(selector.ReturnType.GetSequenceType(), "i");
@@ -757,7 +759,7 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp
return TranslateSelectMany(source, selector, resultSelector);
}
-
+ ///
protected override ShapedQueryExpression? TranslateSingleOrDefault(
ShapedQueryExpression source,
LambdaExpression? predicate,
@@ -770,7 +772,7 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp
returnDefault
? EnumerableMethods.SingleOrDefaultWithoutPredicate
: EnumerableMethods.SingleWithoutPredicate);
-
+ ///
protected override ShapedQueryExpression? TranslateSkip(ShapedQueryExpression source, Expression count)
{
var kafkaQueryExpression = (KafkaQueryExpression)source.QueryExpression;
@@ -790,13 +792,13 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp
return source;
}
-
+ ///
protected override ShapedQueryExpression? TranslateSkipWhile(ShapedQueryExpression source, LambdaExpression predicate)
=> null;
-
+ ///
protected override ShapedQueryExpression? TranslateSum(ShapedQueryExpression source, LambdaExpression? selector, Type resultType)
=> TranslateScalarAggregate(source, selector, nameof(Enumerable.Sum), resultType);
-
+ ///
protected override ShapedQueryExpression? TranslateTake(ShapedQueryExpression source, Expression count)
{
var kafkaQueryExpression = (KafkaQueryExpression)source.QueryExpression;
@@ -816,10 +818,10 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp
return source;
}
-
+ ///
protected override ShapedQueryExpression? TranslateTakeWhile(ShapedQueryExpression source, LambdaExpression predicate)
=> null;
-
+ ///
protected override ShapedQueryExpression? TranslateThenBy(
ShapedQueryExpression source,
LambdaExpression keySelector,
@@ -843,10 +845,10 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp
return source;
}
-
+ ///
protected override ShapedQueryExpression? TranslateUnion(ShapedQueryExpression source1, ShapedQueryExpression source2)
=> TranslateSetOperation(EnumerableMethods.Union, source1, source2);
-
+ ///
protected override ShapedQueryExpression? TranslateWhere(ShapedQueryExpression source, LambdaExpression predicate)
{
var kafkaQueryExpression = (KafkaQueryExpression)source.QueryExpression;
diff --git a/src/net/KEFCore/Query/Internal/KafkaShapedQueryCompilingExpressionVisitor.cs b/src/net/KEFCore/Query/Internal/KafkaShapedQueryCompilingExpressionVisitor.cs
index 1d110592..f546d5a2 100644
--- a/src/net/KEFCore/Query/Internal/KafkaShapedQueryCompilingExpressionVisitor.cs
+++ b/src/net/KEFCore/Query/Internal/KafkaShapedQueryCompilingExpressionVisitor.cs
@@ -30,7 +30,9 @@ public partial class KafkaShapedQueryCompilingExpressionVisitor : ShapedQueryCom
{
private readonly Type _contextType;
private readonly bool _threadSafetyChecksEnabled;
-
+ ///
+ /// Default initilizer
+ ///
public KafkaShapedQueryCompilingExpressionVisitor(
ShapedQueryCompilingExpressionVisitorDependencies dependencies,
QueryCompilationContext queryCompilationContext)
@@ -39,7 +41,7 @@ public KafkaShapedQueryCompilingExpressionVisitor(
_contextType = queryCompilationContext.ContextType;
_threadSafetyChecksEnabled = dependencies.CoreSingletonOptions.AreThreadSafetyChecksEnabled;
}
-
+ ///
protected override Expression VisitExtension(Expression extensionExpression)
{
switch (extensionExpression)
@@ -53,7 +55,7 @@ protected override Expression VisitExtension(Expression extensionExpression)
return base.VisitExtension(extensionExpression);
}
-
+ ///
protected override Expression VisitShapedQuery(ShapedQueryExpression shapedQueryExpression)
{
var kafkaQueryExpression = (KafkaQueryExpression)shapedQueryExpression.QueryExpression;
diff --git a/src/net/KEFCore/Query/Internal/KafkaShapedQueryExpressionVisitorFactory.cs b/src/net/KEFCore/Query/Internal/KafkaShapedQueryExpressionVisitorFactory.cs
index e91741a5..7740210c 100644
--- a/src/net/KEFCore/Query/Internal/KafkaShapedQueryExpressionVisitorFactory.cs
+++ b/src/net/KEFCore/Query/Internal/KafkaShapedQueryExpressionVisitorFactory.cs
@@ -28,6 +28,9 @@ namespace MASES.EntityFrameworkCore.KNet.Query.Internal;
///
public class KafkaShapedQueryCompilingExpressionVisitorFactory : IShapedQueryCompilingExpressionVisitorFactory
{
+ ///
+ /// Default initializer
+ ///
public KafkaShapedQueryCompilingExpressionVisitorFactory(
ShapedQueryCompilingExpressionVisitorDependencies dependencies)
{
@@ -38,7 +41,7 @@ public KafkaShapedQueryCompilingExpressionVisitorFactory(
/// Dependencies for this service.
///
protected virtual ShapedQueryCompilingExpressionVisitorDependencies Dependencies { get; }
-
+ ///
public virtual ShapedQueryCompilingExpressionVisitor Create(QueryCompilationContext queryCompilationContext)
=> new KafkaShapedQueryCompilingExpressionVisitor(Dependencies, queryCompilationContext);
}
diff --git a/src/net/KEFCore/Query/Internal/KafkaTableExpression.cs b/src/net/KEFCore/Query/Internal/KafkaTableExpression.cs
index 7fd627da..18408629 100644
--- a/src/net/KEFCore/Query/Internal/KafkaTableExpression.cs
+++ b/src/net/KEFCore/Query/Internal/KafkaTableExpression.cs
@@ -28,19 +28,24 @@ namespace MASES.EntityFrameworkCore.KNet.Query.Internal;
///
public class KafkaTableExpression : Expression, IPrintableExpression
{
+ ///
+ /// Default initializer
+ ///
public KafkaTableExpression(IEntityType entityType)
{
EntityType = entityType;
}
-
+ ///
public override Type Type
=> typeof(IEnumerable);
-
+ ///
+ /// associated to the
+ ///
public virtual IEntityType EntityType { get; }
-
+ ///
public sealed override ExpressionType NodeType
=> ExpressionType.Extension;
-
+ ///
protected override Expression VisitChildren(ExpressionVisitor visitor)
=> this;
diff --git a/src/net/KEFCore/Query/Internal/SingleResultShaperExpression.cs b/src/net/KEFCore/Query/Internal/SingleResultShaperExpression.cs
index a4ab613d..d2273933 100644
--- a/src/net/KEFCore/Query/Internal/SingleResultShaperExpression.cs
+++ b/src/net/KEFCore/Query/Internal/SingleResultShaperExpression.cs
@@ -28,6 +28,9 @@ namespace MASES.EntityFrameworkCore.KNet.Query.Internal;
///
public class SingleResultShaperExpression : Expression, IPrintableExpression
{
+ ///
+ /// Default initializer
+ ///
public SingleResultShaperExpression(
Expression projection,
Expression innerShaper)
@@ -36,7 +39,7 @@ public SingleResultShaperExpression(
InnerShaper = innerShaper;
Type = innerShaper.Type;
}
-
+ ///
protected override Expression VisitChildren(ExpressionVisitor visitor)
{
var projection = visitor.Visit(Projection);
@@ -49,14 +52,18 @@ public virtual SingleResultShaperExpression Update(Expression projection, Expres
=> projection != Projection || innerShaper != InnerShaper
? new SingleResultShaperExpression(projection, innerShaper)
: this;
-
+ ///
public sealed override ExpressionType NodeType
=> ExpressionType.Extension;
-
+ ///
public override Type Type { get; }
-
+ ///
+ /// Projection
+ ///
public virtual Expression Projection { get; }
-
+ ///
+ /// Inner shaper
+ ///
public virtual Expression InnerShaper { get; }
void IPrintableExpression.Print(ExpressionPrinter expressionPrinter)
diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
index f49e03a0..8cf8a1f6 100644
--- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
+++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
@@ -47,9 +47,9 @@ public class EntityTypeProducer? _kafkaCompactedReplicator;
private readonly IKNetProducer? _kafkaProducer;
- private readonly IKafkaStreamsBaseRetriever _streamData;
- private readonly IKNetSerDes _keySerdes;
- private readonly IKNetSerDes _valueSerdes;
+ private readonly IKafkaStreamsBaseRetriever? _streamData;
+ private readonly IKNetSerDes? _keySerdes;
+ private readonly IKNetSerDes? _valueSerdes;
private readonly Action? _onChangeEvent;
#region KNetCompactedReplicatorEnumerable
@@ -67,8 +67,8 @@ class KNetCompactedReplicatorEnumerator : IEnumerator
Stopwatch _valueBufferSw = new Stopwatch();
#endif
readonly IEntityType _entityType;
- IKNetCompactedReplicator? _kafkaCompactedReplicator;
- readonly IEnumerator> _enumerator;
+ readonly IKNetCompactedReplicator? _kafkaCompactedReplicator;
+ readonly IEnumerator>? _enumerator;
public KNetCompactedReplicatorEnumerator(IEntityType entityType, IKNetCompactedReplicator? kafkaCompactedReplicator)
{
_entityType = entityType;
@@ -95,7 +95,7 @@ public ValueBuffer Current
{
_currentSw.Start();
#endif
- return _current.HasValue ? _current.Value : default;
+ return _current ?? default;
#if DEBUG_PERFORMANCE
}
finally
@@ -127,13 +127,13 @@ public bool MoveNext()
{
_moveNextSw.Start();
#endif
- if (_enumerator.MoveNext())
+ if (_enumerator != null && _enumerator.MoveNext())
{
#if DEBUG_PERFORMANCE
_cycles++;
_valueBufferSw.Start();
#endif
- object[] array = null;
+ object[] array = null!;
_enumerator.Current.Value.GetData(_entityType, ref array);
#if DEBUG_PERFORMANCE
_valueBufferSw.Stop();
@@ -180,7 +180,9 @@ IEnumerator IEnumerable.GetEnumerator()
}
}
#endregion
-
+ ///
+ /// Default initializer
+ ///
public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
{
#if DEBUG_PERFORMANCE
@@ -197,6 +199,9 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
_keySerdes = new TKeySerializer() as IKNetSerDes;
_valueSerdes = new TValueSerializer() as IKNetSerDes;
+ if (_keySerdes == null) throw new InvalidOperationException($"{typeof(TKeySerializer)} is not a {typeof(IKNetSerDes)}");
+ if (_valueSerdes == null) throw new InvalidOperationException($"{typeof(TValueSerializer)} is not a {typeof(IKNetSerDes)}");
+
if (_useCompactedReplicator)
{
_kafkaCompactedReplicator = new KNetCompactedReplicator()
@@ -230,12 +235,12 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
else
{
_kafkaProducer = new KNetProducer(_cluster.Options.ProducerOptions(), _keySerdes, _valueSerdes);
- _streamData = new KafkaStreamsTableRetriever(cluster, entityType, _keySerdes, _valueSerdes);
+ _streamData = new KafkaStreamsTableRetriever(cluster, entityType, _keySerdes!, _valueSerdes!);
}
}
-
+ ///
public virtual IEntityType EntityType => _entityType;
-
+ ///
public IEnumerable> Commit(IEnumerable records)
{
if (_useCompactedReplicator)
@@ -246,7 +251,7 @@ public IEnumerable> Commit(IEnumerable reco
if (_kafkaCompactedReplicator != null) _kafkaCompactedReplicator[record.Key] = value!;
}
- return null;
+ return null!;
}
else
{
@@ -254,7 +259,7 @@ public IEnumerable> Commit(IEnumerable reco
foreach (KafkaRowBag record in records)
{
var future = _kafkaProducer?.Send(new KNetProducerRecord(record.AssociatedTopicName, 0, record.Key, record.Value(TValueContainerConstructor)!));
- futures.Add(future);
+ futures.Add(future!);
}
_kafkaProducer?.Flush();
@@ -262,7 +267,7 @@ public IEnumerable> Commit(IEnumerable reco
return futures;
}
}
-
+ ///
public void Dispose()
{
if (_kafkaCompactedReplicator != null)
@@ -280,7 +285,7 @@ public void Dispose()
_streamData?.Dispose();
}
}
-
+ ///
public IEnumerable ValueBuffers
{
get
diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs
index 8ebfa800..b309948c 100644
--- a/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs
+++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs
@@ -30,9 +30,10 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
///
public class EntityTypeProducers
{
- static IEntityTypeProducer? _globalProducer = null;
static readonly ConcurrentDictionary _producers = new();
-
+ ///
+ /// Allocates a new
+ ///
public static IEntityTypeProducer Create(IEntityType entityType, IKafkaCluster cluster)
where TKey : notnull
where TValueContainer : class, IValueContainer
@@ -41,7 +42,9 @@ public static IEntityTypeProducer Create CreateProducerLocal(entityType, cluster));
}
-
+ ///
+ /// Dispose a previously allocated
+ ///
public static void Dispose(IEntityTypeProducer producer)
{
if (!_producers.TryRemove(new KeyValuePair(producer.EntityType, producer)))
diff --git a/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs
index 57d1a720..b88487cc 100644
--- a/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs
+++ b/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs
@@ -30,9 +30,18 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
///
public interface IEntityTypeProducer : IDisposable
{
+ ///
+ /// Associated
+ ///
IEntityType EntityType { get; }
-
+ ///
+ /// Stores an
+ ///
+ /// The to be stored
+ ///
IEnumerable> Commit(IEnumerable records);
-
+ ///
+ /// The current s
+ ///
IEnumerable ValueBuffers { get; }
}
diff --git a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs
index f2108593..eb173296 100644
--- a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs
+++ b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs
@@ -28,21 +28,40 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
///
public interface IKafkaCluster : IDisposable
{
+ ///
+ /// Execute the
+ ///
bool EnsureDeleted(IUpdateAdapterFactory updateAdapterFactory, IModel designModel, IDiagnosticsLogger updateLogger);
-
+ ///
+ /// Execute the
+ ///
bool EnsureCreated(IUpdateAdapterFactory updateAdapterFactory, IModel designModel, IDiagnosticsLogger updateLogger);
-
+ ///
+ /// Execute the
+ ///
bool EnsureConnected(IModel designModel, IDiagnosticsLogger updateLogger);
-
+ ///
+ /// Creates a table for on Apache Kafka cluster
+ ///
string CreateTable(IEntityType entityType);
-
+ ///
+ /// Retrieve the
+ ///
IEnumerable GetValueBuffers(IEntityType entityType);
-
+ ///
+ /// Gets the
+ ///
KafkaIntegerValueGenerator GetIntegerValueGenerator(IProperty property);
-
+ ///
+ /// Executes a transaction
+ ///
int ExecuteTransaction(IList entries, IDiagnosticsLogger updateLogger);
-
+ ///
+ /// The Apche Kafka cluster identifier
+ ///
string ClusterId { get; }
-
+ ///
+ /// The
+ ///
KafkaOptionsExtension Options { get; }
}
diff --git a/src/net/KEFCore/Storage/Internal/IKafkaClusterCache.cs b/src/net/KEFCore/Storage/Internal/IKafkaClusterCache.cs
index c46d9858..1b788841 100644
--- a/src/net/KEFCore/Storage/Internal/IKafkaClusterCache.cs
+++ b/src/net/KEFCore/Storage/Internal/IKafkaClusterCache.cs
@@ -27,7 +27,12 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
///
public interface IKafkaClusterCache
{
+ ///
+ /// Gets an
+ ///
IKafkaCluster GetCluster(KafkaOptionsExtension options);
-
+ ///
+ /// Dispose an
+ ///
void Dispose(IKafkaCluster cluster);
}
diff --git a/src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs b/src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs
index 1e00226d..d3a056fc 100644
--- a/src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs
+++ b/src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs
@@ -25,11 +25,20 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
///
public interface IKafkaDatabase : IDatabase, IDisposable
{
+ ///
+ /// The referring
+ ///
IKafkaCluster Cluster { get; }
-
+ ///
+ /// Execute the
+ ///
bool EnsureDatabaseDeleted();
-
+ ///
+ /// Execute the
+ ///
bool EnsureDatabaseCreated();
-
+ ///
+ /// Execute the
+ ///
bool EnsureDatabaseConnected();
}
\ No newline at end of file
diff --git a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs
index cf3650b5..2c5945e4 100644
--- a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs
+++ b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs
@@ -27,7 +27,12 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
///
public interface IKafkaRowBag
{
+ ///
+ /// The with changes
+ ///
IUpdateEntry UpdateEntry { get; }
-
+ ///
+ /// The topic data will be stored
+ ///
string AssociatedTopicName { get; }
}
diff --git a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs
index 55cb0b97..2c65ab4c 100644
--- a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs
+++ b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs
@@ -29,21 +29,36 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
///
public interface IKafkaTable : IEntityTypeProducer
{
+ ///
+ /// Create snapshot
+ ///
IReadOnlyList