From d8fd1fb02319ddc99d58ec65211aeca04ee17170 Mon Sep 17 00:00:00 2001 From: deleteLater Date: Tue, 14 Mar 2023 12:18:50 +0800 Subject: [PATCH] feat: add feature flag insights support --- examples/ConsoleApp/Program.cs | 2 +- .../Concurrent/AtomicBoolean.cs | 76 +++++++ .../Evaluation/EvalResult.cs | 2 +- src/FeatBit.ServerSdk/Evaluation/Evaluator.cs | 108 ++++++++-- .../Evaluation/IEvaluator.cs | 5 +- .../Events/DefaultEventBuffer.cs | 35 ++++ .../Events/DefaultEventDispatcher.cs | 198 ++++++++++++++++++ .../Events/DefaultEventDispather.Log.cs | 36 ++++ .../Events/DefaultEventProcessor.cs | 129 ++++++++++++ .../Events/DefaultEventSender.Log.cs | 26 +++ .../Events/DefaultEventSender.cs | 144 +++++++++++++ .../Events/DefaultEventSerializer.cs | 101 +++++++++ .../Events/DeliveryStatus.cs | 9 + src/FeatBit.ServerSdk/Events/IEvent.cs | 85 ++++++++ src/FeatBit.ServerSdk/Events/IEventBuffer.cs | 13 ++ .../Events/IEventDispatcher.cs | 8 + .../Events/IEventProcessor.cs | 48 +++++ src/FeatBit.ServerSdk/Events/IEventSender.cs | 9 + .../Events/IEventSerializer.cs | 11 + src/FeatBit.ServerSdk/FbClient.cs | 33 ++- .../FeatBit.ServerSdk.csproj | 6 + src/FeatBit.ServerSdk/Http/HttpErrors.cs | 21 ++ src/FeatBit.ServerSdk/Model/FeatureFlag.cs | 11 + src/FeatBit.ServerSdk/Options/FbOptions.cs | 63 +++++- .../Options/FbOptionsBuilder.cs | 85 +++++++- .../Properties/AssemblyInfo.cs | 3 +- .../Evaluation/EvaluatorTests.cs | 67 +++--- .../Events/AsyncEventTests.cs | 50 +++++ .../Events/DefaultEventBufferTests.cs | 40 ++++ .../Events/DefaultEventDispatcherTests.cs | 145 +++++++++++++ .../Events/DefaultEventProcessorTests.cs | 122 +++++++++++ .../Events/DefaultEventSenderTests.cs | 76 +++++++ ...lizerTests.SerializeEvalEvent.verified.txt | 27 +++ ...izerTests.SerializeEvalEvents.verified.txt | 48 +++++ .../Events/DefaultEventSerializerTests.cs | 74 +++++++ .../Events/IntEvent.cs | 11 + .../FeatBit.ServerSdk.Tests/FbClientTests.cs | 16 +- .../FeatBit.ServerSdk.Tests.csproj | 1 + tests/FeatBit.ServerSdk.Tests/Usings.cs | 3 +- 39 files changed, 1883 insertions(+), 64 deletions(-) create mode 100644 src/FeatBit.ServerSdk/Concurrent/AtomicBoolean.cs create mode 100644 src/FeatBit.ServerSdk/Events/DefaultEventBuffer.cs create mode 100644 src/FeatBit.ServerSdk/Events/DefaultEventDispatcher.cs create mode 100644 src/FeatBit.ServerSdk/Events/DefaultEventDispather.Log.cs create mode 100644 src/FeatBit.ServerSdk/Events/DefaultEventProcessor.cs create mode 100644 src/FeatBit.ServerSdk/Events/DefaultEventSender.Log.cs create mode 100644 src/FeatBit.ServerSdk/Events/DefaultEventSender.cs create mode 100644 src/FeatBit.ServerSdk/Events/DefaultEventSerializer.cs create mode 100644 src/FeatBit.ServerSdk/Events/DeliveryStatus.cs create mode 100644 src/FeatBit.ServerSdk/Events/IEvent.cs create mode 100644 src/FeatBit.ServerSdk/Events/IEventBuffer.cs create mode 100644 src/FeatBit.ServerSdk/Events/IEventDispatcher.cs create mode 100644 src/FeatBit.ServerSdk/Events/IEventProcessor.cs create mode 100644 src/FeatBit.ServerSdk/Events/IEventSender.cs create mode 100644 src/FeatBit.ServerSdk/Events/IEventSerializer.cs create mode 100644 src/FeatBit.ServerSdk/Http/HttpErrors.cs create mode 100644 tests/FeatBit.ServerSdk.Tests/Events/AsyncEventTests.cs create mode 100644 tests/FeatBit.ServerSdk.Tests/Events/DefaultEventBufferTests.cs create mode 100644 tests/FeatBit.ServerSdk.Tests/Events/DefaultEventDispatcherTests.cs create mode 100644 tests/FeatBit.ServerSdk.Tests/Events/DefaultEventProcessorTests.cs create mode 100644 tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSenderTests.cs create mode 100644 tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSerializerTests.SerializeEvalEvent.verified.txt create mode 100644 tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSerializerTests.SerializeEvalEvents.verified.txt create mode 100644 tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSerializerTests.cs create mode 100644 tests/FeatBit.ServerSdk.Tests/Events/IntEvent.cs diff --git a/examples/ConsoleApp/Program.cs b/examples/ConsoleApp/Program.cs index 7faf37f..ed01104 100644 --- a/examples/ConsoleApp/Program.cs +++ b/examples/ConsoleApp/Program.cs @@ -59,7 +59,7 @@ Console.WriteLine(); } -// Shuts down the client +// Shuts down the client to ensure all pending events are sent. await client.CloseAsync(); Environment.Exit(1); \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Concurrent/AtomicBoolean.cs b/src/FeatBit.ServerSdk/Concurrent/AtomicBoolean.cs new file mode 100644 index 0000000..fe374be --- /dev/null +++ b/src/FeatBit.ServerSdk/Concurrent/AtomicBoolean.cs @@ -0,0 +1,76 @@ +using System.Threading; + +// From: https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka/Util/AtomicBoolean.cs +namespace FeatBit.Sdk.Server.Concurrent +{ + /// + /// Implementation of the java.concurrent.util.AtomicBoolean type. + /// + /// Uses internally to enforce ordering of writes + /// without any explicit locking. .NET's strong memory on write guarantees might already enforce + /// this ordering, but the addition of the MemoryBarrier guarantees it. + /// + public class AtomicBoolean + { + private const int FalseValue = 0; + private const int TrueValue = 1; + + private int _value; + + /// + /// Sets the initial value of this to . + /// + /// TBD + public AtomicBoolean(bool initialValue = false) + { + _value = initialValue ? TrueValue : FalseValue; + } + + /// + /// The current value of this + /// + public bool Value + { + get + { + Interlocked.MemoryBarrier(); + return _value == TrueValue; + } + set { Interlocked.Exchange(ref _value, value ? TrueValue : FalseValue); } + } + + /// + /// If equals , then set the Value to + /// . + /// + /// TBD + /// TBD + /// true if was set + public bool CompareAndSet(bool expected, bool newValue) + { + var expectedInt = expected ? TrueValue : FalseValue; + var newInt = newValue ? TrueValue : FalseValue; + return Interlocked.CompareExchange(ref _value, newInt, expectedInt) == expectedInt; + } + + /// + /// Atomically sets the to and returns the old . + /// + /// The new value + /// The old value + public bool GetAndSet(bool newValue) + { + return Interlocked.Exchange(ref _value, newValue ? TrueValue : FalseValue) == TrueValue; + } + + /// + /// Performs an implicit conversion from to . + /// + /// The boolean to convert + /// The result of the conversion. + public static implicit operator bool(AtomicBoolean atomicBoolean) + { + return atomicBoolean.Value; + } + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Evaluation/EvalResult.cs b/src/FeatBit.ServerSdk/Evaluation/EvalResult.cs index c9d52ed..44658b7 100644 --- a/src/FeatBit.ServerSdk/Evaluation/EvalResult.cs +++ b/src/FeatBit.ServerSdk/Evaluation/EvalResult.cs @@ -1,6 +1,6 @@ namespace FeatBit.Sdk.Server.Evaluation { - public class EvalResult + internal class EvalResult { public ReasonKind Kind { get; set; } diff --git a/src/FeatBit.ServerSdk/Evaluation/Evaluator.cs b/src/FeatBit.ServerSdk/Evaluation/Evaluator.cs index d995a11..eb8a40d 100644 --- a/src/FeatBit.ServerSdk/Evaluation/Evaluator.cs +++ b/src/FeatBit.ServerSdk/Evaluation/Evaluator.cs @@ -1,4 +1,6 @@ using System.Linq; +using System.Runtime.CompilerServices; +using FeatBit.Sdk.Server.Events; using FeatBit.Sdk.Server.Model; using FeatBit.Sdk.Server.Store; @@ -13,31 +15,35 @@ public Evaluator(IMemoryStore store) _store = store; } - public EvalResult Evaluate(EvaluationContext context) + public (EvalResult evalResult, EvalEvent evalEvent) Evaluate(EvaluationContext context) { var storeKey = StoreKeys.ForFeatureFlag(context.FlagKey); var flag = _store.Get(storeKey); if (flag == null) { - return EvalResult.FlagNotFound; + return FlagNotFound(); } return Evaluate(flag, context.FbUser); + + (EvalResult EvalResult, EvalEvent evalEvent) FlagNotFound() => (EvalResult.FlagNotFound, null); } - public EvalResult Evaluate(FeatureFlag flag, FbUser user) + public (EvalResult evalResult, EvalEvent evalEvent) Evaluate(FeatureFlag flag, FbUser user) { + var flagKey = flag.Key; + // if flag is disabled if (!flag.IsEnabled) { var disabledVariation = flag.GetVariation(flag.DisabledVariationId); if (disabledVariation == null) { - return EvalResult.MalformedFlag; + return MalformedFlag(); } - return EvalResult.FlagOff(disabledVariation.Value); + return FlagOff(disabledVariation); } // if user is targeted @@ -45,10 +51,9 @@ public EvalResult Evaluate(FeatureFlag flag, FbUser user) if (targetUser != null) { var targetedVariation = flag.GetVariation(targetUser.VariationId); - return EvalResult.Targeted(targetedVariation.Value); + return Targeted(targetedVariation, flag.ExptIncludeAllTargets); } - var flagKey = flag.Key; string dispatchKey; // if user is rule matched @@ -64,11 +69,10 @@ public EvalResult Evaluate(FeatureFlag flag, FbUser user) var rolloutVariation = rule.Variations.FirstOrDefault(x => x.IsInRollout(dispatchKey)); if (rolloutVariation == null) { - return EvalResult.MalformedFlag; + return MalformedFlag(); } - var ruleMatchedVariation = flag.GetVariation(rolloutVariation.Id); - return EvalResult.RuleMatched(ruleMatchedVariation.Value, rule.Name); + return RuleMatched(rule, rolloutVariation); } } @@ -82,11 +86,89 @@ public EvalResult Evaluate(FeatureFlag flag, FbUser user) flag.Fallthrough.Variations.FirstOrDefault(x => x.IsInRollout(dispatchKey)); if (defaultVariation == null) { - return EvalResult.MalformedFlag; + return MalformedFlag(); + } + + return Fallthrough(); + + (EvalResult EvalResult, EvalEvent evalEvent) MalformedFlag() => (EvalResult.MalformedFlag, null); + + (EvalResult EvalResult, EvalEvent evalEvent) FlagOff(Variation variation) => + (EvalResult.FlagOff(variation.Value), new EvalEvent(user, flagKey, variation, false)); + + (EvalResult EvalResult, EvalEvent evalEvent) Targeted(Variation variation, bool exptIncludeAllTargets) => + (EvalResult.Targeted(variation.Value), new EvalEvent(user, flagKey, variation, exptIncludeAllTargets)); + + (EvalResult EvalResult, EvalEvent evalEvent) RuleMatched(TargetRule rule, RolloutVariation rolloutVariation) + { + var variation = flag.GetVariation(rolloutVariation.Id); + + var evalResult = EvalResult.RuleMatched(variation.Value, rule.Name); + + var sendToExperiment = IsSendToExperiment( + flag.ExptIncludeAllTargets, + rule.IncludedInExpt, + dispatchKey, + rolloutVariation + ); + var evalEvent = new EvalEvent(user, flagKey, variation, sendToExperiment); + + return (evalResult, evalEvent); + } + + (EvalResult EvalResult, EvalEvent evalEvent) Fallthrough() + { + var variation = flag.GetVariation(defaultVariation.Id); + + var evalResult = EvalResult.Fallthrough(variation.Value); + + var sendToExperiment = IsSendToExperiment( + flag.ExptIncludeAllTargets, + flag.Fallthrough.IncludedInExpt, + dispatchKey, + defaultVariation + ); + var evalEvent = new EvalEvent(user, flagKey, variation, sendToExperiment); + + return (evalResult, evalEvent); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static bool IsSendToExperiment( + bool exptIncludeAllTargets, + bool thisRuleIncludeInExpt, + string dispatchKey, + RolloutVariation rolloutVariation) + { + if (exptIncludeAllTargets) + { + return true; + } + + if (!thisRuleIncludeInExpt) + { + return false; + } + + // create a new key to calculate the experiment dispatch percentage + const string exptDispatchKeyPrefix = "expt"; + var sendToExptKey = $"{exptDispatchKeyPrefix}{dispatchKey}"; + + var exptRollout = rolloutVariation.ExptRollout; + var dispatchRollout = rolloutVariation.DispatchRollout(); + if (exptRollout == 0.0 || dispatchRollout == 0.0) + { + return false; + } + + var upperBound = exptRollout / dispatchRollout; + if (upperBound > 1.0) + { + upperBound = 1.0; } - var defaultRuleVariation = flag.GetVariation(defaultVariation.Id); - return EvalResult.Fallthrough(defaultRuleVariation.Value); + return DispatchAlgorithm.IsInRollout(sendToExptKey, new[] { 0.0, upperBound }); } } } \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Evaluation/IEvaluator.cs b/src/FeatBit.ServerSdk/Evaluation/IEvaluator.cs index c9f3677..ba9fd42 100644 --- a/src/FeatBit.ServerSdk/Evaluation/IEvaluator.cs +++ b/src/FeatBit.ServerSdk/Evaluation/IEvaluator.cs @@ -1,11 +1,12 @@ +using FeatBit.Sdk.Server.Events; using FeatBit.Sdk.Server.Model; namespace FeatBit.Sdk.Server.Evaluation { internal interface IEvaluator { - EvalResult Evaluate(EvaluationContext context); + (EvalResult evalResult, EvalEvent evalEvent) Evaluate(EvaluationContext context); - EvalResult Evaluate(FeatureFlag flag, FbUser user); + (EvalResult evalResult, EvalEvent evalEvent) Evaluate(FeatureFlag flag, FbUser user); } } \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/DefaultEventBuffer.cs b/src/FeatBit.ServerSdk/Events/DefaultEventBuffer.cs new file mode 100644 index 0000000..28dac62 --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/DefaultEventBuffer.cs @@ -0,0 +1,35 @@ +using System.Collections.Generic; + +namespace FeatBit.Sdk.Server.Events +{ + internal sealed class DefaultEventBuffer : IEventBuffer + { + private readonly int _capacity; + private readonly List _events; + + public DefaultEventBuffer(int capacity) + { + _capacity = capacity; + _events = new List(); + } + + public bool AddEvent(IEvent @event) + { + if (_events.Count >= _capacity) + { + return false; + } + + _events.Add(@event); + return true; + } + + public int Count => _events.Count; + + public bool IsEmpty => Count == 0; + + public void Clear() => _events.Clear(); + + public IEvent[] EventsSnapshot => _events.ToArray(); + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/DefaultEventDispatcher.cs b/src/FeatBit.ServerSdk/Events/DefaultEventDispatcher.cs new file mode 100644 index 0000000..c47a62f --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/DefaultEventDispatcher.cs @@ -0,0 +1,198 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using FeatBit.Sdk.Server.Concurrent; +using FeatBit.Sdk.Server.Options; +using Microsoft.Extensions.Logging; + +namespace FeatBit.Sdk.Server.Events +{ + internal sealed partial class DefaultEventDispatcher : IEventDispatcher + { + private readonly int _maxFlushWorkers; + private readonly int _maxEventPerRequest; + private readonly CountdownEvent _flushWorkersCounter; + private readonly object _flushWorkerCounterLock = new object(); + + private readonly IEventBuffer _buffer; + private readonly IEventSender _sender; + private readonly IEventSerializer _serializer; + + private readonly AtomicBoolean _stopped; + internal bool HasStopped => _stopped.Value; // internal for testing + + private readonly ILogger _logger; + + internal DefaultEventDispatcher( + FbOptions options, + BlockingCollection queue, + IEventBuffer buffer = null, + IEventSender sender = null, + IEventSerializer serializer = null) + { + _maxFlushWorkers = options.MaxFlushWorker; + _maxEventPerRequest = options.MaxEventPerRequest; + _flushWorkersCounter = new CountdownEvent(1); + + _logger = options.LoggerFactory.CreateLogger(); + + // Here we use TaskFactory.StartNew instead of Task.Run() because that allows us to specify the + // LongRunning option. This option tells the task scheduler that the task is likely to hang on + // to a thread for a long time, so it should consider growing the thread pool. + Task.Factory.StartNew( + () => DispatchLoop(queue), + TaskCreationOptions.LongRunning + ); + + _buffer = buffer ?? new DefaultEventBuffer(options.MaxEventsInQueue); + _sender = sender ?? new DefaultEventSender(options); + _serializer = serializer ?? new DefaultEventSerializer(); + _stopped = new AtomicBoolean(); + } + + private void DispatchLoop(BlockingCollection queue) + { + Log.StartDispatchLoop(_logger); + + var running = true; + while (running) + { + try + { + var @event = queue.Take(); + switch (@event) + { + case PayloadEvent pe: + AddEventToBuffer(pe); + break; + case FlushEvent fe: + TriggerFlush(fe); + break; + case ShutdownEvent se: + WaitForFlushes(); + running = false; + se.Complete(); + break; + } + } + catch (Exception ex) + { + Log.DispatchError(_logger, ex); + } + } + + Log.FinishDispatchLoop(_logger); + } + + private void AddEventToBuffer(IEvent @event) + { + if (_stopped.Value) + { + return; + } + + if (_buffer.AddEvent(@event)) + { + Log.AddedEventToBuffer(_logger); + } + else + { + Log.ExceededCapacity(_logger); + } + } + + private void TriggerFlush(AsyncEvent @event) + { + if (_stopped.Value) + { + @event.Complete(); + return; + } + + if (_buffer.IsEmpty) + { + Log.FlushEmptyBuffer(_logger); + // There are no events to flush. If we don't complete the message, then the async task may never + // complete (if it had a non-zero positive timeout, then it would complete after the timeout). + @event.Complete(); + return; + } + + lock (_flushWorkerCounterLock) + { + // Note that this counter will be 1, not 0, when there are no active flush workers. + // This is because a .NET CountdownEvent can't be reused without explicitly resetting + // it once it has gone to zero. + if (_flushWorkersCounter.CurrentCount >= _maxFlushWorkers + 1) + { + Log.TooManyFlushWorkers(_logger); + // We already have too many workers, so just leave the events as is + @event.Complete(); + return; + } + + // We haven't hit the limit, we'll go ahead and start a flush task + _flushWorkersCounter.AddCount(1); + } + + // get events snapshot then clear original buffer + var snapshot = _buffer.EventsSnapshot; + _buffer.Clear(); + + Task.Run(async () => + { + try + { + await FlushEventsAsync(snapshot); + } + finally + { + _flushWorkersCounter.Signal(); + @event.Complete(); + Log.EventsFlushed(_logger, snapshot.Length); + } + }); + } + + private async Task FlushEventsAsync(IEvent[] events) + { + var memory = new ReadOnlyMemory(events); + + // split and send + var total = events.Length; + for (var i = 0; i < total; i += _maxEventPerRequest) + { + var length = Math.Min(_maxEventPerRequest, total - i); + var slice = memory.Slice(i, length); + var payload = _serializer.Serialize(slice); + + var deliveryStatus = await _sender.SendAsync(payload); + if (deliveryStatus == DeliveryStatus.FailedAndMustShutDown) + { + _stopped.CompareAndSet(false, true); + } + } + } + + private void WaitForFlushes() + { + if (_stopped.GetAndSet(true)) + { + return; + } + + // Our CountdownEvent was initialized with a count of 1, so that's the lowest it can be at this point. + // Drop the count to zero if there are no active flush tasks. + _flushWorkersCounter.Signal(); + // Wait until it is zero. + _flushWorkersCounter.Wait(); + _flushWorkersCounter.Reset(1); + } + + public void Dispose() + { + _flushWorkersCounter?.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/DefaultEventDispather.Log.cs b/src/FeatBit.ServerSdk/Events/DefaultEventDispather.Log.cs new file mode 100644 index 0000000..eef570e --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/DefaultEventDispather.Log.cs @@ -0,0 +1,36 @@ +using System; +using Microsoft.Extensions.Logging; + +namespace FeatBit.Sdk.Server.Events; + +internal sealed partial class DefaultEventDispatcher +{ + private static partial class Log + { + [LoggerMessage(1, LogLevel.Debug, "Start dispatch loop.")] + public static partial void StartDispatchLoop(ILogger logger); + + [LoggerMessage(2, LogLevel.Debug, "Finish dispatch loop.")] + public static partial void FinishDispatchLoop(ILogger logger); + + [LoggerMessage(3, LogLevel.Debug, "Added event to buffer.")] + public static partial void AddedEventToBuffer(ILogger logger); + + [LoggerMessage(4, LogLevel.Debug, "{Count} events has been flushed.")] + public static partial void EventsFlushed(ILogger logger, int count); + + [LoggerMessage(5, LogLevel.Debug, "Flush empty buffer.")] + public static partial void FlushEmptyBuffer(ILogger logger); + + [LoggerMessage(6, LogLevel.Warning, + "Exceeded event queue capacity, event will be dropped. Increase capacity to avoid dropping events.")] + public static partial void ExceededCapacity(ILogger logger); + + [LoggerMessage(7, LogLevel.Debug, + "The number of flush workers has reached the limit. This flush event will be skipped.")] + public static partial void TooManyFlushWorkers(ILogger logger); + + [LoggerMessage(8, LogLevel.Error, "Unexpected error in event dispatcher thread.")] + public static partial void DispatchError(ILogger logger, Exception ex); + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/DefaultEventProcessor.cs b/src/FeatBit.ServerSdk/Events/DefaultEventProcessor.cs new file mode 100644 index 0000000..888bc27 --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/DefaultEventProcessor.cs @@ -0,0 +1,129 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using FeatBit.Sdk.Server.Concurrent; +using FeatBit.Sdk.Server.Options; +using Microsoft.Extensions.Logging; + +namespace FeatBit.Sdk.Server.Events +{ + internal sealed class DefaultEventProcessor : IEventProcessor + { + private readonly BlockingCollection _eventQueue; + private readonly Timer _flushTimer; + private readonly IEventDispatcher _eventDispatcher; + private readonly ILogger _logger; + + private readonly AtomicBoolean _closed = new AtomicBoolean(); + internal bool HasClosed => _closed.Value; // internal for testing + private readonly AtomicBoolean _capacityExceeded = new AtomicBoolean(); + + public DefaultEventProcessor( + FbOptions options, + ILogger logger = null, + Func, IEventDispatcher> dispatcherFactory = null) + { + _eventQueue = new BlockingCollection(options.MaxEventsInQueue); + _flushTimer = new Timer(AutoFlush, null, options.AutoFlushInterval, options.AutoFlushInterval); + + var factory = dispatcherFactory ?? DefaultEventDispatcherFactory; + _eventDispatcher = factory(options, _eventQueue); + + _logger = logger ?? options.LoggerFactory.CreateLogger(); + } + + private static IEventDispatcher DefaultEventDispatcherFactory(FbOptions options, BlockingCollection queue) + { + return new DefaultEventDispatcher(options, queue); + } + + public bool Record(IEvent @event) + { + if (@event == null) + { + return false; + } + + try + { + if (_eventQueue.TryAdd(@event)) + { + _capacityExceeded.GetAndSet(false); + } + else + { + if (!_capacityExceeded.GetAndSet(true)) + { + // The main thread is seriously backed up with not-yet-processed events. + _logger.LogWarning( + "Events are being produced faster than they can be processed. We shouldn't see this." + ); + } + + // If the message is a flush message, then it could never be completed if we cannot + // add it to the queue. So we are going to complete it here to prevent the calling + // code from hanging indefinitely. + if (@event is FlushEvent flushEvent) + { + flushEvent.Complete(); + } + + return false; + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error adding event in a queue."); + return false; + } + + return true; + } + + public void Flush() + { + Record(new FlushEvent()); + } + + public bool FlushAndWait(TimeSpan timeout) + { + var flush = new FlushEvent(); + Record(flush); + return flush.WaitForCompletion(timeout); + } + + public async Task FlushAndWaitAsync(TimeSpan timeout) + { + var flush = new FlushEvent(); + Record(flush); + return await flush.WaitForCompletionAsync(timeout); + } + + public void FlushAndClose(TimeSpan timeout) + { + if (_closed.GetAndSet(true)) + { + return; + } + + _flushTimer?.Dispose(); + _eventDispatcher?.Dispose(); + + Record(new FlushEvent()); + + var shutdown = new ShutdownEvent(); + Record(shutdown); + + shutdown.WaitForCompletion(timeout); + + _eventQueue.CompleteAdding(); + _eventQueue.Dispose(); + } + + private void AutoFlush(object stateInfo) + { + Flush(); + } + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/DefaultEventSender.Log.cs b/src/FeatBit.ServerSdk/Events/DefaultEventSender.Log.cs new file mode 100644 index 0000000..f0cbf8a --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/DefaultEventSender.Log.cs @@ -0,0 +1,26 @@ +using System; +using Microsoft.Extensions.Logging; + +namespace FeatBit.Sdk.Server.Events +{ + internal partial class DefaultEventSender + { + private static partial class Log + { + [LoggerMessage(1, LogLevel.Debug, "Start send event: {Body}")] + public static partial void SendStarted(ILogger logger, string body); + + [LoggerMessage(2, LogLevel.Debug, "Event delivery took {ElapsedMs} ms, response status {Status}.")] + public static partial void SendFinished(ILogger logger, long elapsedMs, int status); + + [LoggerMessage(3, LogLevel.Debug, "Event sending task was cancelled due to a handle timeout.")] + public static partial void SendTaskWasCanceled(ILogger logger); + + [LoggerMessage(4, LogLevel.Debug, "Exception occurred when sending event.")] + public static partial void ErrorSendEvent(ILogger logger, Exception ex); + + [LoggerMessage(5, LogLevel.Warning, "Send event failed: {Reason}.")] + public static partial void SendFailed(ILogger logger, string reason); + } + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/DefaultEventSender.cs b/src/FeatBit.ServerSdk/Events/DefaultEventSender.cs new file mode 100644 index 0000000..8739d98 --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/DefaultEventSender.cs @@ -0,0 +1,144 @@ +using System; +using System.Diagnostics; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using FeatBit.Sdk.Server.Http; +using FeatBit.Sdk.Server.Options; +using Microsoft.Extensions.Logging; + +namespace FeatBit.Sdk.Server.Events +{ + internal partial class DefaultEventSender : IEventSender + { + private static readonly TimeSpan DefaultConnectTimeout = TimeSpan.FromSeconds(1); + private static readonly TimeSpan DefaultReadTimeout = TimeSpan.FromSeconds(1); + private static readonly TimeSpan DefaultTimeout = DefaultConnectTimeout + DefaultReadTimeout; + + private readonly Uri _eventUri; + private const string EventPath = "/api/public/insight/track"; + private readonly int _maxAttempts; + private readonly TimeSpan _retryInterval; + + private readonly HttpClient _httpClient; + + private static readonly MediaTypeHeaderValue JsonContentType = new MediaTypeHeaderValue("application/json") + { + CharSet = "utf-8" + }; + + private readonly Stopwatch _stopwatch = new Stopwatch(); + private readonly ILogger _logger; + + public DefaultEventSender(FbOptions options, HttpClient httpClient = null) + { + _httpClient = httpClient ?? NewHttpClient(); + AddDefaultHeaders(options); + + _eventUri = new Uri(options.EventUri, EventPath); + _maxAttempts = options.MaxSendEventAttempts; + _retryInterval = options.SendEventRetryInterval; + + _logger = options.LoggerFactory.CreateLogger(); + } + + public async Task SendAsync(byte[] payload) + { + for (var attempt = 0; attempt < _maxAttempts; attempt++) + { + if (attempt > 0) + { + await Task.Delay(_retryInterval); + } + + bool isRecoverable; + string error; + using var cts = new CancellationTokenSource(DefaultTimeout); + + try + { + var response = await SendCoreAsync(payload, cts); + if (response.IsSuccessStatusCode) + { + return DeliveryStatus.Succeeded; + } + + error = response.ReasonPhrase; + isRecoverable = HttpErrors.IsRecoverable((int)response.StatusCode); + } + catch (TaskCanceledException ex) + { + if (ex.CancellationToken == cts.Token) + { + Log.SendTaskWasCanceled(_logger); + + // The task was canceled due to a handle timeout, do not retry it. + return DeliveryStatus.Failed; + } + + // Otherwise this was a request timeout. + isRecoverable = true; + error = "request timeout"; + } + catch (Exception ex) + { + Log.ErrorSendEvent(_logger, ex); + isRecoverable = true; + error = ex.Message; + } + + Log.SendFailed(_logger, error); + if (!isRecoverable) + { + return DeliveryStatus.FailedAndMustShutDown; + } + } + + Log.SendFailed(_logger, "Reconnect retries have been exhausted after max failed attempts."); + return DeliveryStatus.Failed; + } + + private async Task SendCoreAsync(byte[] payload, CancellationTokenSource cts) + { + // check log level to avoid unnecessary string allocation + if (_logger.IsEnabled(LogLevel.Trace)) + { + var body = Encoding.UTF8.GetString(payload); + Log.SendStarted(_logger, body); + } + + using var content = new ByteArrayContent(payload); + content.Headers.ContentType = JsonContentType; + + _stopwatch.Restart(); + using var response = await _httpClient.PostAsync(_eventUri, content, cts.Token); + _stopwatch.Stop(); + + Log.SendFinished(_logger, _stopwatch.ElapsedMilliseconds, (int)response.StatusCode); + + return response; + } + + private static HttpClient NewHttpClient() + { +#if NETCOREAPP || NET6_0 + var handler = new SocketsHttpHandler + { + ConnectTimeout = DefaultConnectTimeout + }; +#else + var handler = new HttpClientHandler(); +#endif + var client = new HttpClient(handler, false); + return client; + } + + private void AddDefaultHeaders(FbOptions options) + { + _httpClient.DefaultRequestHeaders.Add("Authorization", options.EnvSecret); + _httpClient.DefaultRequestHeaders.Add("User-Agent", "fb-dotnet-server-sdk"); + } + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/DefaultEventSerializer.cs b/src/FeatBit.ServerSdk/Events/DefaultEventSerializer.cs new file mode 100644 index 0000000..34bf47a --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/DefaultEventSerializer.cs @@ -0,0 +1,101 @@ +using System; +using System.IO; +using System.Text.Json; +using FeatBit.Sdk.Server.Model; + +namespace FeatBit.Sdk.Server.Events +{ + internal class DefaultEventSerializer : IEventSerializer + { + public byte[] Serialize(IEvent @event) + { + using var stream = new MemoryStream(); + using var writer = new Utf8JsonWriter(stream); + + WriteEvent(@event, writer); + + writer.Flush(); + return stream.ToArray(); + } + + public byte[] Serialize(ReadOnlyMemory events) + { + var span = events.Span; + + using var stream = new MemoryStream(); + using var writer = new Utf8JsonWriter(stream); + + writer.WriteStartArray(); + for (var i = 0; i < span.Length; i++) + { + WriteEvent(span[i], writer); + } + + writer.WriteEndArray(); + + writer.Flush(); + return stream.ToArray(); + } + + private static void WriteEvent(IEvent ue, Utf8JsonWriter writer) + { + switch (ue) + { + case EvalEvent ee: + WriteEvalEvent(ee, writer); + break; + } + } + + private static void WriteEvalEvent(EvalEvent ee, Utf8JsonWriter writer) + { + writer.WriteStartObject(); + + WriteUser(ee.User, writer); + + writer.WriteStartArray("variations"); + + writer.WriteStartObject(); + writer.WriteString("featureFlagKey", ee.FlagKey); + WriteVariation(ee.Variation, writer); + writer.WriteNumber("timestamp", ee.Timestamp); + writer.WriteBoolean("sendToExperiment", ee.SendToExperiment); + writer.WriteEndObject(); + + writer.WriteEndArray(); + + writer.WriteEndObject(); + } + + private static void WriteVariation(Variation variation, Utf8JsonWriter writer) + { + writer.WriteStartObject("variation"); + + writer.WriteString("id", variation.Id); + writer.WriteString("value", variation.Value); + + writer.WriteEndObject(); + } + + private static void WriteUser(FbUser user, Utf8JsonWriter writer) + { + writer.WriteStartObject("user"); + + writer.WriteString("keyId", user.Key); + writer.WriteString("name", user.Name); + + writer.WriteStartArray("customizedProperties"); + foreach (var kv in user.Custom) + { + writer.WriteStartObject(); + writer.WriteString("name", kv.Key); + writer.WriteString("value", kv.Value); + writer.WriteEndObject(); + } + + writer.WriteEndArray(); + + writer.WriteEndObject(); + } + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/DeliveryStatus.cs b/src/FeatBit.ServerSdk/Events/DeliveryStatus.cs new file mode 100644 index 0000000..d1513d3 --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/DeliveryStatus.cs @@ -0,0 +1,9 @@ +namespace FeatBit.Sdk.Server.Events +{ + internal enum DeliveryStatus + { + Succeeded, + Failed, + FailedAndMustShutDown + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/IEvent.cs b/src/FeatBit.ServerSdk/Events/IEvent.cs new file mode 100644 index 0000000..35eceb8 --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/IEvent.cs @@ -0,0 +1,85 @@ +using System; +using System.Threading.Tasks; +using FeatBit.Sdk.Server.Model; + +namespace FeatBit.Sdk.Server.Events +{ + internal interface IEvent + { + } + + internal abstract class AsyncEvent : IEvent + { + private readonly TaskCompletionSource _innerTcs; + private readonly Task _innerTask; + + public bool IsCompleted => _innerTask.IsCompleted; + + internal AsyncEvent() + { + _innerTcs = new TaskCompletionSource(); + _innerTask = _innerTcs.Task; + } + + internal bool WaitForCompletion(TimeSpan timeout) + { + if (timeout <= TimeSpan.Zero) + { + _innerTask.Wait(); + return true; + } + + return _innerTask.Wait(timeout); + } + + internal Task WaitForCompletionAsync(TimeSpan timeout) + { + if (timeout <= TimeSpan.Zero) + { + return _innerTask; + } + + var timeoutTask = Task.Delay(timeout).ContinueWith(_ => false); + return Task.WhenAny(_innerTask, timeoutTask).Result; + } + + internal void Complete() + { + _innerTcs.SetResult(true); + } + } + + internal sealed class FlushEvent : AsyncEvent + { + } + + internal sealed class ShutdownEvent : AsyncEvent + { + } + + internal class PayloadEvent : IEvent + { + } + + internal sealed class EvalEvent : PayloadEvent + { + public FbUser User { get; set; } + + public string FlagKey { get; set; } + + public long Timestamp { get; set; } + + public Variation Variation { get; set; } + + public bool SendToExperiment { get; set; } + + public EvalEvent(FbUser user, string flagKey, Variation variation, bool sendToExperiment) + { + User = user; + FlagKey = flagKey; + Variation = variation; + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + SendToExperiment = sendToExperiment; + } + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/IEventBuffer.cs b/src/FeatBit.ServerSdk/Events/IEventBuffer.cs new file mode 100644 index 0000000..7a97fc6 --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/IEventBuffer.cs @@ -0,0 +1,13 @@ +namespace FeatBit.Sdk.Server.Events +{ + internal interface IEventBuffer + { + bool AddEvent(IEvent @event); + + bool IsEmpty { get; } + + void Clear(); + + IEvent[] EventsSnapshot { get; } + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/IEventDispatcher.cs b/src/FeatBit.ServerSdk/Events/IEventDispatcher.cs new file mode 100644 index 0000000..ff723a7 --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/IEventDispatcher.cs @@ -0,0 +1,8 @@ +using System; + +namespace FeatBit.Sdk.Server.Events +{ + internal interface IEventDispatcher : IDisposable + { + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/IEventProcessor.cs b/src/FeatBit.ServerSdk/Events/IEventProcessor.cs new file mode 100644 index 0000000..98075af --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/IEventProcessor.cs @@ -0,0 +1,48 @@ +using System; +using System.Threading.Tasks; + +namespace FeatBit.Sdk.Server.Events +{ + /// + /// Represents a processor that can process events. + /// + internal interface IEventProcessor + { + /// + /// Records an . + /// + /// The event to be recorded. + /// A boolean value indicating whether the operation succeeded or not. + bool Record(IEvent @event); + + /// + /// Triggers an asynchronous event flush. + /// + void Flush(); + + /// + /// Blocking version of . + /// + /// maximum time to wait; zero or negative timeout means indefinitely + /// true if completed, false if timed out + bool FlushAndWait(TimeSpan timeout); + + /// + /// Asynchronous version of . + /// + /// + /// The difference between this and is that you can await the task to simulate + /// blocking behavior. + /// + /// maximum time to wait; zero or negative timeout means indefinitely + /// a task that resolves to true if completed, false if timed out + Task FlushAndWaitAsync(TimeSpan timeout); + + /// + /// Flush all events and close this processor. + /// + /// maximum time to wait; zero or negative timeout means indefinitely + /// true if completed, false if timed out + void FlushAndClose(TimeSpan timeout); + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/IEventSender.cs b/src/FeatBit.ServerSdk/Events/IEventSender.cs new file mode 100644 index 0000000..c63705a --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/IEventSender.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace FeatBit.Sdk.Server.Events +{ + internal interface IEventSender + { + Task SendAsync(byte[] payload); + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Events/IEventSerializer.cs b/src/FeatBit.ServerSdk/Events/IEventSerializer.cs new file mode 100644 index 0000000..c564a65 --- /dev/null +++ b/src/FeatBit.ServerSdk/Events/IEventSerializer.cs @@ -0,0 +1,11 @@ +using System; + +namespace FeatBit.Sdk.Server.Events +{ + internal interface IEventSerializer + { + public byte[] Serialize(IEvent @event); + + public byte[] Serialize(ReadOnlyMemory events); + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/FbClient.cs b/src/FeatBit.ServerSdk/FbClient.cs index 8230da9..7a8d8e7 100644 --- a/src/FeatBit.ServerSdk/FbClient.cs +++ b/src/FeatBit.ServerSdk/FbClient.cs @@ -3,6 +3,7 @@ using System.Threading.Tasks; using FeatBit.Sdk.Server.DataSynchronizer; using FeatBit.Sdk.Server.Evaluation; +using FeatBit.Sdk.Server.Events; using FeatBit.Sdk.Server.Model; using FeatBit.Sdk.Server.Options; using FeatBit.Sdk.Server.Store; @@ -17,7 +18,8 @@ public sealed class FbClient private readonly FbOptions _options; private readonly IMemoryStore _store; private readonly IDataSynchronizer _dataSynchronizer; - private readonly Evaluator _evaluator; + private readonly IEvaluator _evaluator; + private readonly IEventProcessor _eventProcessor; private readonly ILogger _logger; #endregion @@ -104,6 +106,7 @@ public FbClient(FbOptions options) _store = new DefaultMemoryStore(); _dataSynchronizer = new WebSocketDataSynchronizer(options, _store); _evaluator = new Evaluator(_store); + _eventProcessor = new DefaultEventProcessor(options); _logger = options.LoggerFactory.CreateLogger(); // starts client @@ -111,12 +114,17 @@ public FbClient(FbOptions options) } // internal use for testing - internal FbClient(FbOptions options, IMemoryStore store, IDataSynchronizer synchronizer) + internal FbClient( + FbOptions options, + IMemoryStore store, + IDataSynchronizer synchronizer, + IEventProcessor eventProcessor) { _options = options; _store = store; _dataSynchronizer = synchronizer; _evaluator = new Evaluator(_store); + _eventProcessor = eventProcessor; _logger = options.LoggerFactory.CreateLogger(); // starts client @@ -230,11 +238,12 @@ public EvalDetail StringVariationDetail(string key, FbUser user, string /// /// a given user /// an array - public EvalResult[] GetAllVariations(FbUser user) + public EvalDetail[] GetAllVariations(FbUser user) { var results = _store .Find(x => x.StoreKey.StartsWith(StoreKeys.FlagPrefix)) - .Select(flag => _evaluator.Evaluate(flag, user)) + .Select(flag => _evaluator.Evaluate(flag, user).evalResult) + .Select(x => new EvalDetail(x.Kind, x.Reason, x.Value)) .ToArray(); return results; @@ -245,7 +254,10 @@ public EvalResult[] GetAllVariations(FbUser user) /// public async Task CloseAsync() { + _logger.LogInformation("Closing FbClient..."); await _dataSynchronizer.StopAsync(); + await _eventProcessor.FlushAndWaitAsync(_options.FlushTimeout); + _logger.LogInformation("FbClient successfully closed."); } private EvalDetail EvaluateCore( @@ -266,17 +278,20 @@ private EvalDetail EvaluateCore( FbUser = user }; - var result = _evaluator.Evaluate(ctx); - if (result.Kind == ReasonKind.Error) + var (evalResult, evalEvent) = _evaluator.Evaluate(ctx); + if (evalResult.Kind == ReasonKind.Error) { // error happened when evaluate flag, return default value - return new EvalDetail(result.Kind, result.Reason, defaultValue); + return new EvalDetail(evalResult.Kind, evalResult.Reason, defaultValue); } + // record evaluation event + _eventProcessor.Record(evalEvent); + try { - var typedValue = converter(result.Value); - return new EvalDetail(result.Kind, result.Reason, typedValue); + var typedValue = converter(evalResult.Value); + return new EvalDetail(evalResult.Kind, evalResult.Reason, typedValue); } catch { diff --git a/src/FeatBit.ServerSdk/FeatBit.ServerSdk.csproj b/src/FeatBit.ServerSdk/FeatBit.ServerSdk.csproj index 86f8b2c..2ad1af6 100644 --- a/src/FeatBit.ServerSdk/FeatBit.ServerSdk.csproj +++ b/src/FeatBit.ServerSdk/FeatBit.ServerSdk.csproj @@ -48,6 +48,12 @@ + + + + + + diff --git a/src/FeatBit.ServerSdk/Http/HttpErrors.cs b/src/FeatBit.ServerSdk/Http/HttpErrors.cs new file mode 100644 index 0000000..9c6c1d1 --- /dev/null +++ b/src/FeatBit.ServerSdk/Http/HttpErrors.cs @@ -0,0 +1,21 @@ +namespace FeatBit.Sdk.Server.Http; + +internal static class HttpErrors +{ + /// + /// Returns true if this type of error could be expected to eventually resolve itself, + /// or false if it indicates a configuration problem or client logic error such that the + /// client should give up on making any further requests. + /// + /// a status code + /// true if retrying is appropriate + public static bool IsRecoverable(int status) + { + if (status is >= 400 and <= 499) + { + return status is 400 or 408 or 429; + } + + return true; + } +} \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Model/FeatureFlag.cs b/src/FeatBit.ServerSdk/Model/FeatureFlag.cs index e94a226..088decd 100644 --- a/src/FeatBit.ServerSdk/Model/FeatureFlag.cs +++ b/src/FeatBit.ServerSdk/Model/FeatureFlag.cs @@ -100,5 +100,16 @@ internal sealed class RolloutVariation public double ExptRollout { get; set; } public bool IsInRollout(string key) => DispatchAlgorithm.IsInRollout(key, Rollout); + + public double DispatchRollout() + { + if (Rollout is not { Length: 2 }) + { + // malformed rollout + return 0.0; + } + + return Rollout[1] - Rollout[0]; + } } } \ No newline at end of file diff --git a/src/FeatBit.ServerSdk/Options/FbOptions.cs b/src/FeatBit.ServerSdk/Options/FbOptions.cs index 435f21c..950ad20 100644 --- a/src/FeatBit.ServerSdk/Options/FbOptions.cs +++ b/src/FeatBit.ServerSdk/Options/FbOptions.cs @@ -29,13 +29,13 @@ public sealed class FbOptions /// /// This value must greater equal than 1 second. /// - /// Defaults to 3 seconds + /// Defaults to 5 seconds public TimeSpan StartWaitTime { get; } /// /// The connection timeout. This is the time allowed for the WebSocket client to connect to the server. /// - /// Defaults to 5 seconds + /// Defaults to 3 seconds public TimeSpan ConnectTimeout { get; } /// @@ -55,6 +55,48 @@ public sealed class FbOptions /// public TimeSpan[] ReconnectRetryDelays { get; } + /// + /// The event flush timeout. + /// + /// Defaults to 5 seconds + public TimeSpan FlushTimeout { get; set; } + + /// + /// The maximum number of flush workers. + /// + /// Defaults to Math.Min(Math.Max(Environment.ProcessorCount / 2, 1), 4) + public int MaxFlushWorker { get; set; } + + /// + /// The time interval between each flush operation. + /// + /// Defaults to 5 seconds + public TimeSpan AutoFlushInterval { get; set; } + + /// + /// The maximum number of events in queue. + /// + /// Defaults to 10_000 + public int MaxEventsInQueue { get; set; } + + /// + /// The maximum number of events per request. + /// + /// Defaults to 50 + public int MaxEventPerRequest { get; set; } + + /// + /// The maximum number of attempts to send an event before giving up. + /// + /// Defaults to 2 + public int MaxSendEventAttempts { get; set; } + + /// + /// The time interval between each retry attempt to send an event. + /// + /// Defaults to 200 milliseconds + public TimeSpan SendEventRetryInterval { get; set; } + /// /// The logger factory used by FbClient. /// @@ -80,6 +122,13 @@ internal FbOptions( TimeSpan closeTimeout, TimeSpan keepAliveInterval, TimeSpan[] reconnectRetryDelays, + int maxFlushWorker, + TimeSpan autoFlushInterval, + TimeSpan flushTimeout, + int maxEventsInQueue, + int maxEventPerRequest, + int maxSendEventAttempts, + TimeSpan sendEventRetryInterval, ILoggerFactory loggerFactory) { EnvSecret = envSecret; @@ -90,13 +139,21 @@ internal FbOptions( CloseTimeout = closeTimeout; KeepAliveInterval = keepAliveInterval; ReconnectRetryDelays = reconnectRetryDelays; + MaxFlushWorker = maxFlushWorker; + AutoFlushInterval = autoFlushInterval; + FlushTimeout = flushTimeout; + MaxEventsInQueue = maxEventsInQueue; + MaxEventPerRequest = maxEventPerRequest; + MaxSendEventAttempts = maxSendEventAttempts; + SendEventRetryInterval = sendEventRetryInterval; LoggerFactory = loggerFactory; } internal FbOptions ShallowCopy() { var newOptions = new FbOptions(EnvSecret, StreamingUri, EventUri, StartWaitTime, ConnectTimeout, - CloseTimeout, KeepAliveInterval, ReconnectRetryDelays, LoggerFactory); + CloseTimeout, KeepAliveInterval, ReconnectRetryDelays, MaxFlushWorker, AutoFlushInterval, FlushTimeout, + MaxEventsInQueue, MaxEventPerRequest, MaxSendEventAttempts, SendEventRetryInterval, LoggerFactory); return newOptions; } diff --git a/src/FeatBit.ServerSdk/Options/FbOptionsBuilder.cs b/src/FeatBit.ServerSdk/Options/FbOptionsBuilder.cs index 7c4c5e5..c39683c 100644 --- a/src/FeatBit.ServerSdk/Options/FbOptionsBuilder.cs +++ b/src/FeatBit.ServerSdk/Options/FbOptionsBuilder.cs @@ -8,34 +8,59 @@ namespace FeatBit.Sdk.Server.Options public class FbOptionsBuilder { private readonly string _envSecret; + private Uri _streamingUri; private Uri _eventUri; + private TimeSpan _startWaitTime; private TimeSpan _connectTimeout; private TimeSpan _closeTimeout; private TimeSpan _keepAliveInterval; private TimeSpan[] _reconnectRetryDelays; + + private int _maxFlushWorker; + private TimeSpan _autoFlushInterval; + private TimeSpan _flushTimeout; + private int _maxEventsInQueue; + private int _maxEventPerRequest; + private int _maxSendEventAttempts; + private TimeSpan _sendEventRetryInterval; + private ILoggerFactory _loggerFactory; public FbOptionsBuilder(string envSecret) { _envSecret = envSecret; - // default values + // uris _streamingUri = new Uri("ws://localhost:5100"); _eventUri = new Uri("http://localhost:5100"); - _startWaitTime = TimeSpan.FromSeconds(3); - _connectTimeout = TimeSpan.FromSeconds(5); + + _startWaitTime = TimeSpan.FromSeconds(5); + + // websocket configs + _connectTimeout = TimeSpan.FromSeconds(3); _closeTimeout = TimeSpan.FromSeconds(2); _keepAliveInterval = TimeSpan.FromSeconds(15); _reconnectRetryDelays = DefaultRetryPolicy.DefaultRetryDelays; + + // event configs + _maxFlushWorker = Math.Min(Math.Max(Environment.ProcessorCount / 2, 1), 4); + _autoFlushInterval = TimeSpan.FromSeconds(5); + _flushTimeout = TimeSpan.FromSeconds(5); + _maxEventsInQueue = 10_000; + _maxEventPerRequest = 50; + _maxSendEventAttempts = 2; + _sendEventRetryInterval = TimeSpan.FromMilliseconds(200); + _loggerFactory = NullLoggerFactory.Instance; } public FbOptions Build() { return new FbOptions(_envSecret, _streamingUri, _eventUri, _startWaitTime, _connectTimeout, _closeTimeout, - _keepAliveInterval, _reconnectRetryDelays, _loggerFactory); + _keepAliveInterval, _reconnectRetryDelays, _maxFlushWorker, _autoFlushInterval, _flushTimeout, + _maxEventsInQueue, _maxEventPerRequest, _maxSendEventAttempts, _sendEventRetryInterval, _loggerFactory); } public FbOptionsBuilder Steaming(Uri uri) @@ -52,12 +77,22 @@ public FbOptionsBuilder Event(Uri uri) public FbOptionsBuilder StartWaitTime(TimeSpan waitTime) { + if (_startWaitTime < _connectTimeout) + { + throw new InvalidOperationException("The start wait time must be bigger than the connect timeout."); + } + _startWaitTime = waitTime; return this; } public FbOptionsBuilder ConnectTimeout(TimeSpan timeout) { + if (_connectTimeout > _startWaitTime) + { + throw new InvalidOperationException("The connect timeout must be lower than the start wait time."); + } + _connectTimeout = timeout; return this; } @@ -68,6 +103,48 @@ public FbOptionsBuilder CloseTimeout(TimeSpan timeout) return this; } + public FbOptionsBuilder MaxFlushWorker(int maxFlushWorker) + { + _maxFlushWorker = maxFlushWorker; + return this; + } + + public FbOptionsBuilder AutoFlushInterval(TimeSpan autoFlushInterval) + { + _autoFlushInterval = autoFlushInterval; + return this; + } + + public FbOptionsBuilder FlushTimeout(TimeSpan timeout) + { + _flushTimeout = timeout; + return this; + } + + public FbOptionsBuilder MaxEventsInQueue(int maxEventsInQueue) + { + _maxEventsInQueue = maxEventsInQueue; + return this; + } + + public FbOptionsBuilder MaxEventPerRequest(int maxEventPerRequest) + { + _maxEventPerRequest = maxEventPerRequest; + return this; + } + + public FbOptionsBuilder MaxSendEventAttempts(int maxSendEventAttempts) + { + _maxSendEventAttempts = maxSendEventAttempts; + return this; + } + + public FbOptionsBuilder SendEventRetryInterval(TimeSpan sendEventRetryInterval) + { + _sendEventRetryInterval = sendEventRetryInterval; + return this; + } + public FbOptionsBuilder KeepAliveInterval(TimeSpan interval) { _keepAliveInterval = interval; diff --git a/src/FeatBit.ServerSdk/Properties/AssemblyInfo.cs b/src/FeatBit.ServerSdk/Properties/AssemblyInfo.cs index c9f2a91..e4687b3 100644 --- a/src/FeatBit.ServerSdk/Properties/AssemblyInfo.cs +++ b/src/FeatBit.ServerSdk/Properties/AssemblyInfo.cs @@ -1,3 +1,4 @@ using System.Runtime.CompilerServices; -[assembly: InternalsVisibleTo("FeatBit.ServerSdk.Tests")] \ No newline at end of file +[assembly: InternalsVisibleTo("FeatBit.ServerSdk.Tests")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] \ No newline at end of file diff --git a/tests/FeatBit.ServerSdk.Tests/Evaluation/EvaluatorTests.cs b/tests/FeatBit.ServerSdk.Tests/Evaluation/EvaluatorTests.cs index d7c39cb..d44ddc1 100644 --- a/tests/FeatBit.ServerSdk.Tests/Evaluation/EvaluatorTests.cs +++ b/tests/FeatBit.ServerSdk.Tests/Evaluation/EvaluatorTests.cs @@ -16,11 +16,13 @@ public void EvaluateFlagNotFound() FbUser = new FbUserBuilder("u1").Build() }; - var result = evaluator.Evaluate(context); + var (evalResult, evalEvent) = evaluator.Evaluate(context); - Assert.Equal(ReasonKind.Error, result.Kind); - Assert.Equal(string.Empty, result.Value); - Assert.Equal("flag not found", result.Reason); + Assert.Equal(ReasonKind.Error, evalResult.Kind); + Assert.Equal(string.Empty, evalResult.Value); + Assert.Equal("flag not found", evalResult.Reason); + + Assert.Null(evalEvent); } [Fact] @@ -42,11 +44,13 @@ public void EvaluateMalformedFlag() FbUser = new FbUserBuilder("u1").Build() }; - var result = evaluator.Evaluate(context); + var (evalResult, evalEvent) = evaluator.Evaluate(context); + + Assert.Equal(ReasonKind.Error, evalResult.Kind); + Assert.Equal(string.Empty, evalResult.Value); + Assert.Equal("malformed flag", evalResult.Reason); - Assert.Equal(ReasonKind.Error, result.Kind); - Assert.Equal(string.Empty, result.Value); - Assert.Equal("malformed flag", result.Reason); + Assert.Null(evalEvent); } [Fact] @@ -68,11 +72,14 @@ public void EvaluateFlagOffResult() FbUser = new FbUserBuilder("u1").Build() }; - var result = evaluator.Evaluate(context); + var (evalResult, evalEvent) = evaluator.Evaluate(context); - Assert.Equal(ReasonKind.Off, result.Kind); - Assert.Equal("true", result.Value); - Assert.Equal("flag off", result.Reason); + Assert.Equal(ReasonKind.Off, evalResult.Kind); + Assert.Equal("true", evalResult.Value); + Assert.Equal("flag off", evalResult.Reason); + + // flag is off + Assert.False(evalEvent.SendToExperiment); } [Fact] @@ -105,11 +112,14 @@ public void EvaluateTargetedResult() FbUser = new FbUserBuilder("u1").Build() }; - var result = evaluator.Evaluate(context); + var (evalResult, evalEvent) = evaluator.Evaluate(context); + + Assert.Equal(ReasonKind.TargetMatch, evalResult.Kind); + Assert.Equal("false", evalResult.Value); + Assert.Equal("target match", evalResult.Reason); - Assert.Equal(ReasonKind.TargetMatch, result.Kind); - Assert.Equal("false", result.Value); - Assert.Equal("target match", result.Reason); + // ExptIncludeAllTargets is true by default + Assert.True(evalEvent.SendToExperiment); } [Fact] @@ -135,7 +145,7 @@ public void EvaluatorRuleMatchedResult() } }, DispatchKey = "keyId", - IncludedInExpt = true, + IncludedInExpt = false, Variations = new List { new() @@ -150,6 +160,7 @@ public void EvaluatorRuleMatchedResult() var flag = new FeatureFlagBuilder() .Key("hello") .IsEnabled(true) + .ExptIncludeAllTargets(false) .Rules(customRule) .Variations(variations) .Build(); @@ -164,11 +175,14 @@ public void EvaluatorRuleMatchedResult() .Build() }; - var result = evaluator.Evaluate(context); + var (evalResult, evalEvent) = evaluator.Evaluate(context); + + Assert.Equal(ReasonKind.RuleMatch, evalResult.Kind); + Assert.Equal("true", evalResult.Value); + Assert.Equal($"match rule {customRule.Name}", evalResult.Reason); - Assert.Equal(ReasonKind.RuleMatch, result.Kind); - Assert.Equal("true", result.Value); - Assert.Equal($"match rule {customRule.Name}", result.Reason); + // customRule.IncludedInExpt is false + Assert.False(evalEvent.SendToExperiment); } [Fact] @@ -200,6 +214,7 @@ public void EvaluateFallthroughResult() var flag = new FeatureFlagBuilder() .Key("hello") .IsEnabled(true) + .ExptIncludeAllTargets(false) .Fallthrough(fallthrough) .Variations(variations) .Build(); @@ -212,10 +227,12 @@ public void EvaluateFallthroughResult() FbUser = new FbUserBuilder("u1").Build() }; - var result = evaluator.Evaluate(context); + var (evalResult, evalEvent) = evaluator.Evaluate(context); + + Assert.Equal(ReasonKind.Fallthrough, evalResult.Kind); + Assert.Equal("false", evalResult.Value); + Assert.Equal("fall through targets and rules", evalResult.Reason); - Assert.Equal(ReasonKind.Fallthrough, result.Kind); - Assert.Equal("false", result.Value); - Assert.Equal("fall through targets and rules", result.Reason); + Assert.True(evalEvent.SendToExperiment); } } \ No newline at end of file diff --git a/tests/FeatBit.ServerSdk.Tests/Events/AsyncEventTests.cs b/tests/FeatBit.ServerSdk.Tests/Events/AsyncEventTests.cs new file mode 100644 index 0000000..14fcd5c --- /dev/null +++ b/tests/FeatBit.ServerSdk.Tests/Events/AsyncEventTests.cs @@ -0,0 +1,50 @@ +namespace FeatBit.Sdk.Server.Events; + +public class AsyncEventTests +{ + [Theory] + [ClassData(typeof(AsyncEvents))] + internal void AsyncEventIsCompleted(AsyncEvent asyncEvent) + { + Assert.False(asyncEvent.IsCompleted); + + asyncEvent.Complete(); + + Assert.True(asyncEvent.IsCompleted); + } + + [Fact] + internal void WaitForCompletion() + { + Assert.True(CompleteInTime(new FlushEvent(), 10, 100)); + Assert.False(CompleteInTime(new ShutdownEvent(), 100, 10)); + } + + [Fact] + internal async Task WaitForCompletionAsync() + { + Assert.True(await CompleteInTimeAsync(new FlushEvent(), 10, 100)); + Assert.False(await CompleteInTimeAsync(new ShutdownEvent(), 100, 10)); + } + + private static bool CompleteInTime(AsyncEvent asyncEvent, int timeToComplete, int timeout) + { + _ = Task.Delay(timeToComplete).ContinueWith(_ => asyncEvent.Complete()); + return asyncEvent.WaitForCompletion(TimeSpan.FromMilliseconds(timeout)); + } + + private static async Task CompleteInTimeAsync(AsyncEvent asyncEvent, int timeToComplete, int timeout) + { + _ = Task.Delay(timeToComplete).ContinueWith(_ => asyncEvent.Complete()); + return await asyncEvent.WaitForCompletionAsync(TimeSpan.FromMilliseconds(timeout)); + } +} + +internal class AsyncEvents : TheoryData +{ + public AsyncEvents() + { + Add(new FlushEvent()); + Add(new ShutdownEvent()); + } +} \ No newline at end of file diff --git a/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventBufferTests.cs b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventBufferTests.cs new file mode 100644 index 0000000..5c24518 --- /dev/null +++ b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventBufferTests.cs @@ -0,0 +1,40 @@ +namespace FeatBit.Sdk.Server.Events; + +public class DefaultEventBufferTests +{ + [Fact] + public void GetEventSnapshot() + { + var buffer = new DefaultEventBuffer(2); + + buffer.AddEvent(new IntEvent(1)); + buffer.AddEvent(new IntEvent(2)); + + var snapshot = buffer.EventsSnapshot; + + buffer.Clear(); + Assert.Equal(0, buffer.Count); + Assert.True(buffer.IsEmpty); + + // after the buffer is cleared, the snapshot should remain unchanged + Assert.Equal(2, snapshot.Length); + + Assert.Equal(1, ((IntEvent)snapshot[0]).Value); + Assert.Equal(2, ((IntEvent)snapshot[1]).Value); + } + + [Fact] + public void IgnoreNewEventAfterBufferIsFull() + { + var buffer = new DefaultEventBuffer(2); + + Assert.True(buffer.AddEvent(new IntEvent(1))); + Assert.True(buffer.AddEvent(new IntEvent(2))); + + // buffer is full, the following events should be ignored + Assert.False(buffer.AddEvent(new IntEvent(3))); + Assert.False(buffer.AddEvent(new IntEvent(4))); + + Assert.Equal(2, buffer.Count); + } +} \ No newline at end of file diff --git a/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventDispatcherTests.cs b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventDispatcherTests.cs new file mode 100644 index 0000000..f7fe8f2 --- /dev/null +++ b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventDispatcherTests.cs @@ -0,0 +1,145 @@ +using System.Collections.Concurrent; +using FeatBit.Sdk.Server.Options; + +namespace FeatBit.Sdk.Server.Events; + +public class DefaultEventDispatcherTests +{ + [Fact] + public void NewAndDispose() + { + var queue = new BlockingCollection(); + var options = new FbOptionsBuilder("fake-secret").Build(); + + using var dispatcher = new DefaultEventDispatcher(options, queue); + } + + [Fact] + public void AddEvent() + { + var queue = new BlockingCollection(); + var options = new FbOptionsBuilder("fake-secret").Build(); + + var mockBuffer = new Mock(); + + using var dispatcher = new DefaultEventDispatcher(options, queue, buffer: mockBuffer.Object); + + queue.Add(new IntEvent(1)); + queue.Add(new IntEvent(2)); + + mockBuffer.Verify(x => x.AddEvent(It.IsAny()), Times.Exactly(2)); + } + + [Fact] + public void Flush() + { + var queue = new BlockingCollection(); + var options = new FbOptionsBuilder("fake-secret").Build(); + + var mockSender = new Mock(); + + using var dispatcher = + new DefaultEventDispatcher(options, queue, sender: mockSender.Object); + + queue.Add(new IntEvent(1)); + queue.Add(new IntEvent(2)); + + FlushAndWaitComplete(queue); + + mockSender.Verify(x => x.SendAsync(It.IsAny()), Times.Once); + } + + [Fact] + public void FlushEmptyBuffer() + { + var queue = new BlockingCollection(); + var options = new FbOptionsBuilder("fake-secret").Build(); + + using var dispatcher = new DefaultEventDispatcher(options, queue); + + FlushAndWaitComplete(queue); + } + + [Fact] + public void Shutdown() + { + var queue = new BlockingCollection(); + var options = new FbOptionsBuilder("fake-secret").Build(); + + using var dispatcher = new DefaultEventDispatcher(options, queue); + + var shutdownEvent = new ShutdownEvent(); + queue.Add(shutdownEvent); + EnsureAsyncEventComplete(shutdownEvent); + } + + [Fact] + public void EventSenderStopDispatcher() + { + var queue = new BlockingCollection(); + var options = new FbOptionsBuilder("fake-secret").Build(); + + var mockSender = new Mock(); + mockSender.Setup(x => x.SendAsync(It.IsAny())) + .Returns(Task.FromResult(DeliveryStatus.FailedAndMustShutDown)); + + // Create a dispatcher with a mock sender that always fails + using var dispatcher = new DefaultEventDispatcher(options, queue, sender: mockSender.Object); + + // Add an event and flush it to trigger the sender + AddEventThenFlush(); + mockSender.Verify(x => x.SendAsync(It.IsAny()), Times.Once); + + // Clear previous invocations for later verification + mockSender.Invocations.Clear(); + + // Check if dispatcher stopped after event sender return FailedAndMustShutDown + Assert.True(dispatcher.HasStopped); + + // Check if add and flush operations are no-op after dispatcher has stopped + AddEventThenFlush(); + mockSender.Verify(x => x.SendAsync(It.IsAny()), Times.Never); + + void AddEventThenFlush() + { + queue.Add(new IntEvent(1)); + FlushAndWaitComplete(queue); + } + } + + [Theory] + [InlineData(5, 3, 1)] + [InlineData(3, 12, 4)] + [InlineData(5, 12, 3)] + public void SendEventsInMultiBatch(int eventPerRequest, int totalEvents, int expectedBatch) + { + var queue = new BlockingCollection(); + var options = new FbOptionsBuilder("fake-secret") + .MaxEventPerRequest(eventPerRequest) + .Build(); + + var mockSender = new Mock(); + + using var dispatcher = new DefaultEventDispatcher(options, queue, sender: mockSender.Object); + + for (var i = 0; i < totalEvents; i++) + { + queue.Add(new IntEvent(i)); + } + + FlushAndWaitComplete(queue); + mockSender.Verify(x => x.SendAsync(It.IsAny()), Times.Exactly(expectedBatch)); + } + + private static void FlushAndWaitComplete(BlockingCollection queue) + { + var flushEvent = new FlushEvent(); + queue.Add(flushEvent); + EnsureAsyncEventComplete(flushEvent); + } + + private static void EnsureAsyncEventComplete(AsyncEvent asyncEvent) + { + Assert.True(SpinWait.SpinUntil(() => asyncEvent.IsCompleted, 1000)); + } +} \ No newline at end of file diff --git a/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventProcessorTests.cs b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventProcessorTests.cs new file mode 100644 index 0000000..5cb8454 --- /dev/null +++ b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventProcessorTests.cs @@ -0,0 +1,122 @@ +using System.Collections.Concurrent; +using FeatBit.Sdk.Server.Options; +using Microsoft.Extensions.Logging; + +namespace FeatBit.Sdk.Server.Events; + +public class DefaultEventProcessorTests +{ + [Fact] + public void StartAndClose() + { + var options = new FbOptionsBuilder("secret").Build(); + var processor = new DefaultEventProcessor(options); + + // this should complete immediately + processor.FlushAndClose(TimeSpan.FromMilliseconds(100)); + + Assert.True(processor.HasClosed); + } + + [Fact] + public void CloseAnProcessorMultiTimes() + { + var options = new FbOptionsBuilder("secret").Build(); + var processor = new DefaultEventProcessor(options); + + processor.FlushAndClose(TimeSpan.FromMilliseconds(100)); + processor.FlushAndClose(TimeSpan.FromMilliseconds(100)); + } + + [Fact] + public void RecordEvent() + { + var options = new FbOptionsBuilder("secret").Build(); + var processor = new DefaultEventProcessor(options); + + Assert.True(processor.Record(new IntEvent(1))); + } + + [Fact] + public void RecordNullEvent() + { + var options = new FbOptionsBuilder("secret").Build(); + var processor = new DefaultEventProcessor(options); + + Assert.False(processor.Record(null)); + } + + [Fact] + public void ExceedCapacity() + { + var loggerMock = new Mock>(); + + var options = new FbOptionsBuilder("secret") + // set max queue size to 2 + .MaxEventsInQueue(2) + .Build(); + + // create a dispatcher that will not consume the processor's message so that processor's queue can be full + var dispatcher = new DefaultEventDispatcher(options, new BlockingCollection()); + var processor = new DefaultEventProcessor(options, loggerMock.Object, (_, _) => dispatcher); + + Assert.True(processor.Record(new IntEvent(1))); + Assert.True(processor.Record(new IntEvent(2))); + + // the processor rejects new events when its queue is full + Assert.False(processor.Record(new IntEvent(3))); + + // the processor will directly complete any flush event that is rejected. + var flushEvent = new FlushEvent(); + Assert.False(processor.Record(flushEvent)); + Assert.True(flushEvent.IsCompleted); + + // verify warning logged once + loggerMock.Verify( + logger => logger.Log( + LogLevel.Warning, + It.IsAny(), + It.IsAny(), + null, + It.IsAny>() + ), + Times.Once + ); + } + + [Fact] + public void WaitFlush() + { + var options = new FbOptionsBuilder("secret").Build(); + var mockedSender = new Mock(); + + var processor = new DefaultEventProcessor( + options, + dispatcherFactory: (opts, queue) => new DefaultEventDispatcher(opts, queue, sender: mockedSender.Object) + ); + + processor.Record(new IntEvent(1)); + var flushedInTime = processor.FlushAndWait(TimeSpan.FromMilliseconds(100)); + + Assert.True(flushedInTime); + mockedSender.Verify(x => x.SendAsync(It.IsAny()), Times.Once); + } + + [Fact] + public async Task WaitFlushAsync() + { + var options = new FbOptionsBuilder("secret").Build(); + var mockedSender = new Mock(); + + var processor = new DefaultEventProcessor( + options, + dispatcherFactory: (opts, queue) => new DefaultEventDispatcher(opts, queue, sender: mockedSender.Object) + ); + + processor.Record(new IntEvent(1)); + var flushedInTime = await processor.FlushAndWaitAsync(TimeSpan.FromMilliseconds(100)); + + Assert.True(flushedInTime); + mockedSender.Verify(x => x.SendAsync(It.IsAny()), Times.Once); + } +} \ No newline at end of file diff --git a/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSenderTests.cs b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSenderTests.cs new file mode 100644 index 0000000..5241745 --- /dev/null +++ b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSenderTests.cs @@ -0,0 +1,76 @@ +using System.Net; +using System.Text; +using FeatBit.Sdk.Server.Options; + +namespace FeatBit.Sdk.Server.Events; + +public class DefaultEventSenderTests +{ + [Theory] + // ok + [InlineData(HttpStatusCode.OK, DeliveryStatus.Succeeded)] + + // recoverable error + [InlineData(HttpStatusCode.TooManyRequests, DeliveryStatus.Failed)] + [InlineData(HttpStatusCode.RequestTimeout, DeliveryStatus.Failed)] + + // unrecoverable error + [InlineData(HttpStatusCode.NotFound, DeliveryStatus.FailedAndMustShutDown)] + [InlineData(HttpStatusCode.Unauthorized, DeliveryStatus.FailedAndMustShutDown)] + internal async Task CheckDeliveryStatusBasedOnServerReturns(HttpStatusCode code, DeliveryStatus status) + { + var options = new FbOptionsBuilder("secret").Build(); + + var httpClient = new HttpClient(new EventHttpMessageHandlerMock(SequencedCode)); + var sender = new DefaultEventSender(options, httpClient); + + var payload = Encoding.UTF8.GetBytes("{ \"value\": 1 }"); + var deliveryStatus = await sender.SendAsync(payload); + + Assert.Equal(status, deliveryStatus); + + HttpStatusCode SequencedCode(int _) => code; + } + + [Fact] + public async Task ReturnsOkAfterRetry() + { + var options = new FbOptionsBuilder("secret") + .SendEventRetryInterval(TimeSpan.FromMilliseconds(5)) + .Build(); + + var httpClient = new HttpClient(new EventHttpMessageHandlerMock(SequencedCode)); + var sender = new DefaultEventSender(options, httpClient); + + var payload = Encoding.UTF8.GetBytes("{ \"value\": 1 }"); + var deliveryStatus = await sender.SendAsync(payload); + + Assert.Equal(DeliveryStatus.Succeeded, deliveryStatus); + + HttpStatusCode SequencedCode(int sequence) => + sequence % 2 == 0 ? HttpStatusCode.RequestTimeout : HttpStatusCode.OK; + } +} + +internal sealed class EventHttpMessageHandlerMock : HttpMessageHandler +{ + private int _sequence; + private readonly Func _sequencedCode; + + public EventHttpMessageHandlerMock(Func sequencedCode) + { + _sequencedCode = sequencedCode; + _sequence = 0; + } + + protected override Task SendAsync(HttpRequestMessage request, + CancellationToken cancellationToken) + { + Assert.Equal("secret", request.Headers.Authorization?.Scheme); + Assert.Equal(HttpMethod.Post, request.Method); + + var code = _sequencedCode(_sequence++); + + return Task.FromResult(new HttpResponseMessage(code)); + } +} \ No newline at end of file diff --git a/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSerializerTests.SerializeEvalEvent.verified.txt b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSerializerTests.SerializeEvalEvent.verified.txt new file mode 100644 index 0000000..65146d1 --- /dev/null +++ b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSerializerTests.SerializeEvalEvent.verified.txt @@ -0,0 +1,27 @@ +{ + user: { + keyId: u1-Id, + name: u1-name, + customizedProperties: [ + { + name: custom, + value: value + }, + { + name: country, + value: us + } + ] + }, + variations: [ + { + featureFlagKey: hello, + variation: { + id: v1Id, + value: v1 + }, + timestamp: {Scrubbed}, + sendToExperiment: true + } + ] +} \ No newline at end of file diff --git a/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSerializerTests.SerializeEvalEvents.verified.txt b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSerializerTests.SerializeEvalEvents.verified.txt new file mode 100644 index 0000000..544d39f --- /dev/null +++ b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSerializerTests.SerializeEvalEvents.verified.txt @@ -0,0 +1,48 @@ +[ + { + user: { + keyId: u2-Id, + name: u2-name, + customizedProperties: [ + { + name: age, + value: 10 + } + ] + }, + variations: [ + { + featureFlagKey: hello, + variation: { + id: v2Id, + value: v2 + }, + timestamp: {Scrubbed}, + sendToExperiment: false + } + ] + }, + { + user: { + keyId: u3-Id, + name: u3-name, + customizedProperties: [ + { + name: age, + value: 10 + } + ] + }, + variations: [ + { + featureFlagKey: hello, + variation: { + id: v3Id, + value: v3 + }, + timestamp: {Scrubbed}, + sendToExperiment: true + } + ] + } +] \ No newline at end of file diff --git a/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSerializerTests.cs b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSerializerTests.cs new file mode 100644 index 0000000..7af3a05 --- /dev/null +++ b/tests/FeatBit.ServerSdk.Tests/Events/DefaultEventSerializerTests.cs @@ -0,0 +1,74 @@ +using System.Text; +using FeatBit.Sdk.Server.Model; + +namespace FeatBit.Sdk.Server.Events; + +[UsesVerify] +public class DefaultEventSerializerTests +{ + [Fact] + public async Task SerializeEvalEvent() + { + var serializer = new DefaultEventSerializer(); + + var @event = AllEvents()[0]; + + var jsonBytes = serializer.Serialize(@event); + var json = Encoding.UTF8.GetString(jsonBytes); + + await VerifyJson(json).ScrubMember("timestamp"); + } + + [Fact] + public async Task SerializeEvalEvents() + { + var serializer = new DefaultEventSerializer(); + + var events = AllEvents(); + var result = new ReadOnlyMemory(events, 1, 2); + + var jsonBytes = serializer.Serialize(result); + var json = Encoding.UTF8.GetString(jsonBytes); + + await VerifyJson(json).ScrubMember("timestamp"); + } + + private static IEvent[] AllEvents() + { + var user1 = FbUser.Builder("u1-Id") + .Name("u1-name") + .Custom("custom", "value") + .Custom("country", "us") + .Build(); + var v1Variation = new Variation + { + Id = "v1Id", + Value = "v1" + }; + var event1 = new EvalEvent(user1, "hello", v1Variation, true); + + var user2 = FbUser.Builder("u2-Id") + .Name("u2-name") + .Custom("age", "10") + .Build(); + var v2Variation = new Variation + { + Id = "v2Id", + Value = "v2" + }; + var event2 = new EvalEvent(user2, "hello", v2Variation, false); + + var user3 = FbUser.Builder("u3-Id") + .Name("u3-name") + .Custom("age", "10") + .Build(); + var v3Variation = new Variation + { + Id = "v3Id", + Value = "v3" + }; + var event3 = new EvalEvent(user3, "hello", v3Variation, true); + + return new IEvent[] { event1, event2, event3 }; + } +} \ No newline at end of file diff --git a/tests/FeatBit.ServerSdk.Tests/Events/IntEvent.cs b/tests/FeatBit.ServerSdk.Tests/Events/IntEvent.cs new file mode 100644 index 0000000..0918f27 --- /dev/null +++ b/tests/FeatBit.ServerSdk.Tests/Events/IntEvent.cs @@ -0,0 +1,11 @@ +namespace FeatBit.Sdk.Server.Events; + +internal sealed class IntEvent : PayloadEvent +{ + public int Value { get; } + + public IntEvent(int value) + { + Value = value; + } +} \ No newline at end of file diff --git a/tests/FeatBit.ServerSdk.Tests/FbClientTests.cs b/tests/FeatBit.ServerSdk.Tests/FbClientTests.cs index 4dd79d1..e901d5b 100644 --- a/tests/FeatBit.ServerSdk.Tests/FbClientTests.cs +++ b/tests/FeatBit.ServerSdk.Tests/FbClientTests.cs @@ -1,5 +1,6 @@ using FeatBit.Sdk.Server.DataSynchronizer; using FeatBit.Sdk.Server.Evaluation; +using FeatBit.Sdk.Server.Events; using FeatBit.Sdk.Server.Model; using FeatBit.Sdk.Server.Options; using FeatBit.Sdk.Server.Store; @@ -40,23 +41,29 @@ public async Task CloseUninitializedFbClient() [Fact] public void GetBoolVariation() { - var client = CreateTestFbClient(); + var eventProcessorMock = new Mock(); + var client = CreateTestFbClient(eventProcessorMock.Object); var user = FbUser.Builder("u1").Build(); var variation = client.BoolVariation("returns-true", user); Assert.True(variation); + + eventProcessorMock.Verify(x => x.Record(It.IsAny()), Times.Once); } [Fact] public void GetBoolVariationDetail() { - var client = CreateTestFbClient(); + var eventProcessorMock = new Mock(); + var client = CreateTestFbClient(eventProcessorMock.Object); var user = FbUser.Builder("u1").Build(); var variationDetail = client.BoolVariationDetail("returns-true", user); Assert.True(variationDetail.Value); Assert.Equal(ReasonKind.Fallthrough, variationDetail.Kind); Assert.Equal("fall through targets and rules", variationDetail.Reason); + + eventProcessorMock.Verify(x => x.Record(It.IsAny()), Times.Once); } [Fact] @@ -74,7 +81,7 @@ public void GetAllVariations() Assert.Equal("fall through targets and rules", result0.Reason); } - private FbClient CreateTestFbClient() + private FbClient CreateTestFbClient(IEventProcessor processor = null) { var options = new FbOptionsBuilder("qJHQTVfsZUOu1Q54RLMuIQ-JtrIvNK-k-bARYicOTNQA") .Steaming(new Uri("ws://localhost/")) @@ -83,7 +90,8 @@ private FbClient CreateTestFbClient() var store = new DefaultMemoryStore(); var synchronizer = new WebSocketDataSynchronizer(options, store, op => _app.CreateFbWebSocket(op)); - var client = new FbClient(options, store, synchronizer); + var eventProcessor = processor ?? new DefaultEventProcessor(options); + var client = new FbClient(options, store, synchronizer, eventProcessor); return client; } } \ No newline at end of file diff --git a/tests/FeatBit.ServerSdk.Tests/FeatBit.ServerSdk.Tests.csproj b/tests/FeatBit.ServerSdk.Tests/FeatBit.ServerSdk.Tests.csproj index b7cebbd..251801f 100644 --- a/tests/FeatBit.ServerSdk.Tests/FeatBit.ServerSdk.Tests.csproj +++ b/tests/FeatBit.ServerSdk.Tests/FeatBit.ServerSdk.Tests.csproj @@ -17,6 +17,7 @@ + diff --git a/tests/FeatBit.ServerSdk.Tests/Usings.cs b/tests/FeatBit.ServerSdk.Tests/Usings.cs index 8c927eb..7a8920d 100644 --- a/tests/FeatBit.ServerSdk.Tests/Usings.cs +++ b/tests/FeatBit.ServerSdk.Tests/Usings.cs @@ -1 +1,2 @@ -global using Xunit; \ No newline at end of file +global using Xunit; +global using Moq; \ No newline at end of file