diff --git a/src/Momento.Sdk/Internal/LoggingUtils.cs b/src/Momento.Sdk/Internal/LoggingUtils.cs
index 0716af7a..27bed456 100644
--- a/src/Momento.Sdk/Internal/LoggingUtils.cs
+++ b/src/Momento.Sdk/Internal/LoggingUtils.cs
@@ -450,6 +450,40 @@ public static void LogTraceTopicMessageReceived(this ILogger logger, string mess
logger.LogTrace("Received '{}' message on: cacheName: {}; topicName: {}", messageType, cacheName, topicName);
}
}
+
+ ///
+ /// Logs a message at TRACE level that indicates that a discontinuity was received.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static void LogTraceTopicDiscontinuityReceived(this ILogger logger, string cacheName, string topicName, ulong lastSequenceNumber, ulong newSequenceNumber)
+ {
+ if (logger.IsEnabled(LogLevel.Trace))
+ {
+ logger.LogTrace("Received discontinuity: cacheName: {}; topicName: {}, lastSequenceNumber: {}, newSequenceNumber: {}", cacheName, topicName, lastSequenceNumber, newSequenceNumber);
+ }
+ }
+
+ ///
+ /// Logs a message at TRACE level that indicates that a topic subscription received an error.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static TError LogTraceTopicSubscriptionError(this ILogger logger, string cacheName, string topicName, TError error)
+ {
+ if (logger.IsEnabled(LogLevel.Trace))
+ {
+ logger.LogTrace("An error was received by a subscription: cacheName: {}; topicName: {}; error: {}", cacheName, topicName, error);
+ }
+ return error;
+ }
#endif
///
diff --git a/src/Momento.Sdk/Internal/ScsTopicClient.cs b/src/Momento.Sdk/Internal/ScsTopicClient.cs
index 19d829d8..cf035961 100644
--- a/src/Momento.Sdk/Internal/ScsTopicClient.cs
+++ b/src/Momento.Sdk/Internal/ScsTopicClient.cs
@@ -119,11 +119,11 @@ private async Task SendSubscribe(string cacheName, strin
request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value;
}
- AsyncServerStreamingCall<_SubscriptionItem> subscription;
+ SubscriptionWrapper subscriptionWrapper;
try
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName);
- subscription = grpcManager.Client.subscribe(request, new CallOptions());
+ subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger);
}
catch (Exception e)
{
@@ -132,72 +132,126 @@ private async Task SendSubscribe(string cacheName, strin
}
var response = new TopicSubscribeResponse.Subscription(
- cancellationToken => GetNextRelevantMessageFromGrpcStreamAsync(subscription, cancellationToken, cacheName, topicName),
- subscription.Dispose);
+ cancellationToken => subscriptionWrapper.GetNextRelevantMessageFromGrpcStreamAsync(cancellationToken),
+ subscriptionWrapper.Dispose);
return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicSubscribe, cacheName, topicName,
response);
}
- private async ValueTask GetNextRelevantMessageFromGrpcStreamAsync(AsyncServerStreamingCall<_SubscriptionItem> subscription,
- CancellationToken cancellationToken, string cacheName, string topicName)
+ private class SubscriptionWrapper : IDisposable
{
- if (cancellationToken.IsCancellationRequested)
+ private readonly TopicGrpcManager _grpcManager;
+ private readonly string _cacheName;
+ private readonly string _topicName;
+ private readonly CacheExceptionMapper _exceptionMapper;
+ private readonly ILogger _logger;
+
+ private AsyncServerStreamingCall<_SubscriptionItem> _subscription;
+ private ulong? _lastSequenceNumber;
+
+ public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
+ string topicName, CacheExceptionMapper exceptionMapper, ILogger logger)
{
- return null;
+ _grpcManager = grpcManager;
+ _cacheName = cacheName;
+ _topicName = topicName;
+ _exceptionMapper = exceptionMapper;
+ _logger = logger;
+
+ _subscription = Subscribe();
}
- try
+ private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
+ {
+ var request = new _SubscriptionRequest
+ {
+ CacheName = _cacheName,
+ Topic = _topicName
+ };
+ if (_lastSequenceNumber != null)
+ {
+ request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value;
+ }
+
+ _logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName);
+ return _grpcManager.Client.subscribe(request, new CallOptions());
+ }
+
+ public async ValueTask GetNextRelevantMessageFromGrpcStreamAsync(
+ CancellationToken cancellationToken)
{
- while (await subscription.ResponseStream.MoveNext(cancellationToken))
+ while (!cancellationToken.IsCancellationRequested)
{
- var message = subscription.ResponseStream.Current;
+ try
+ {
+ await _subscription.ResponseStream.MoveNext(cancellationToken);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch (Exception e)
+ {
+ var sdkException = _exceptionMapper.Convert(e);
+ if (sdkException.ErrorCode == MomentoErrorCode.CANCELLED_ERROR)
+ {
+ break;
+ }
+ //TODO: Are there other errors that we should break for?
+
+ _logger.LogTraceTopicSubscriptionError(_cacheName, _topicName, sdkException);
+
+ await Task.Delay(5_000, cancellationToken);
+ Dispose();
+ _subscription = Subscribe();
+ continue;
+ }
+
+ var message = _subscription.ResponseStream.Current;
switch (message.KindCase)
{
case _SubscriptionItem.KindOneofCase.Item:
+ _lastSequenceNumber = message.Item.TopicSequenceNumber;
switch (message.Item.Value.KindCase)
{
case _TopicValue.KindOneofCase.Text:
- _logger.LogTraceTopicMessageReceived("text", cacheName, topicName);
+ _logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName);
return new TopicMessage.Text(message.Item.Value);
case _TopicValue.KindOneofCase.Binary:
- _logger.LogTraceTopicMessageReceived("binary", cacheName, topicName);
+ _logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName);
return new TopicMessage.Binary(message.Item.Value);
case _TopicValue.KindOneofCase.None:
default:
- _logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName);
+ _logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
break;
}
break;
case _SubscriptionItem.KindOneofCase.Discontinuity:
- _logger.LogTraceTopicMessageReceived("discontinuity", cacheName, topicName);
+ _logger.LogTraceTopicDiscontinuityReceived(_cacheName, _topicName,
+ message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence);
+ _lastSequenceNumber = message.Discontinuity.NewTopicSequence;
break;
case _SubscriptionItem.KindOneofCase.Heartbeat:
- _logger.LogTraceTopicMessageReceived("heartbeat", cacheName, topicName);
+ _logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName);
break;
case _SubscriptionItem.KindOneofCase.None:
- _logger.LogTraceTopicMessageReceived("none", cacheName, topicName);
+ _logger.LogTraceTopicMessageReceived("none", _cacheName, _topicName);
break;
default:
- _logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName);
+ _logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
break;
}
}
- }
- catch (OperationCanceledException)
- {
+
return null;
}
- catch (Exception e)
+
+ public void Dispose()
{
- var sdkException = _exceptionMapper.Convert(e);
- return sdkException.ErrorCode == MomentoErrorCode.CANCELLED_ERROR
- ? null
- : new TopicMessage.Error(sdkException);
+ _subscription.Dispose();
}
-
- return null;
}
}
#endif
\ No newline at end of file