Skip to content

Commit

Permalink
feat: reconnect topic subscriptions on error (#478)
Browse files Browse the repository at this point in the history
* feat: reconnect topic subscriptions on error

Instead of returning an error message, attempt to reconnect to a topic
on error by remaking the subscription.

Add a discontinuity logging message with the sequence numbers.
  • Loading branch information
nand4011 authored Sep 8, 2023
1 parent b2fc063 commit e459101
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 28 deletions.
34 changes: 34 additions & 0 deletions src/Momento.Sdk/Internal/LoggingUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,40 @@ public static void LogTraceTopicMessageReceived(this ILogger logger, string mess
logger.LogTrace("Received '{}' message on: cacheName: {}; topicName: {}", messageType, cacheName, topicName);
}
}

/// <summary>
/// Logs a message at TRACE level that indicates that a discontinuity was received.
/// </summary>
/// <param name="logger"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
/// <param name="lastSequenceNumber"></param>
/// <param name="newSequenceNumber"></param>
public static void LogTraceTopicDiscontinuityReceived(this ILogger logger, string cacheName, string topicName, ulong lastSequenceNumber, ulong newSequenceNumber)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("Received discontinuity: cacheName: {}; topicName: {}, lastSequenceNumber: {}, newSequenceNumber: {}", cacheName, topicName, lastSequenceNumber, newSequenceNumber);
}
}

/// <summary>
/// Logs a message at TRACE level that indicates that a topic subscription received an error.
/// </summary>
/// <typeparam name="TError"></typeparam>
/// <param name="logger"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
/// <param name="error"></param>
/// <returns></returns>
public static TError LogTraceTopicSubscriptionError<TError>(this ILogger logger, string cacheName, string topicName, TError error)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("An error was received by a subscription: cacheName: {}; topicName: {}; error: {}", cacheName, topicName, error);
}
return error;
}
#endif

/// <summary>
Expand Down
110 changes: 82 additions & 28 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, strin
request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value;
}

AsyncServerStreamingCall<_SubscriptionItem> subscription;
SubscriptionWrapper subscriptionWrapper;
try
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName);
subscription = grpcManager.Client.subscribe(request, new CallOptions());
subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger);
}
catch (Exception e)
{
Expand All @@ -132,72 +132,126 @@ private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, strin
}

var response = new TopicSubscribeResponse.Subscription(
cancellationToken => GetNextRelevantMessageFromGrpcStreamAsync(subscription, cancellationToken, cacheName, topicName),
subscription.Dispose);
cancellationToken => subscriptionWrapper.GetNextRelevantMessageFromGrpcStreamAsync(cancellationToken),
subscriptionWrapper.Dispose);
return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicSubscribe, cacheName, topicName,
response);
}

private async ValueTask<TopicMessage?> GetNextRelevantMessageFromGrpcStreamAsync(AsyncServerStreamingCall<_SubscriptionItem> subscription,
CancellationToken cancellationToken, string cacheName, string topicName)
private class SubscriptionWrapper : IDisposable
{
if (cancellationToken.IsCancellationRequested)
private readonly TopicGrpcManager _grpcManager;
private readonly string _cacheName;
private readonly string _topicName;
private readonly CacheExceptionMapper _exceptionMapper;
private readonly ILogger _logger;

private AsyncServerStreamingCall<_SubscriptionItem> _subscription;
private ulong? _lastSequenceNumber;

public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
string topicName, CacheExceptionMapper exceptionMapper, ILogger logger)
{
return null;
_grpcManager = grpcManager;
_cacheName = cacheName;
_topicName = topicName;
_exceptionMapper = exceptionMapper;
_logger = logger;

_subscription = Subscribe();
}

try
private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
{
var request = new _SubscriptionRequest
{
CacheName = _cacheName,
Topic = _topicName
};
if (_lastSequenceNumber != null)
{
request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value;
}

_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName);
return _grpcManager.Client.subscribe(request, new CallOptions());
}

public async ValueTask<TopicMessage?> GetNextRelevantMessageFromGrpcStreamAsync(
CancellationToken cancellationToken)
{
while (await subscription.ResponseStream.MoveNext(cancellationToken))
while (!cancellationToken.IsCancellationRequested)
{
var message = subscription.ResponseStream.Current;
try
{
await _subscription.ResponseStream.MoveNext(cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception e)
{
var sdkException = _exceptionMapper.Convert(e);
if (sdkException.ErrorCode == MomentoErrorCode.CANCELLED_ERROR)
{
break;
}
//TODO: Are there other errors that we should break for?

_logger.LogTraceTopicSubscriptionError(_cacheName, _topicName, sdkException);

await Task.Delay(5_000, cancellationToken);
Dispose();
_subscription = Subscribe();
continue;
}

var message = _subscription.ResponseStream.Current;

switch (message.KindCase)
{
case _SubscriptionItem.KindOneofCase.Item:
_lastSequenceNumber = message.Item.TopicSequenceNumber;
switch (message.Item.Value.KindCase)
{
case _TopicValue.KindOneofCase.Text:
_logger.LogTraceTopicMessageReceived("text", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName);
return new TopicMessage.Text(message.Item.Value);
case _TopicValue.KindOneofCase.Binary:
_logger.LogTraceTopicMessageReceived("binary", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName);
return new TopicMessage.Binary(message.Item.Value);
case _TopicValue.KindOneofCase.None:
default:
_logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
break;
}

break;
case _SubscriptionItem.KindOneofCase.Discontinuity:
_logger.LogTraceTopicMessageReceived("discontinuity", cacheName, topicName);
_logger.LogTraceTopicDiscontinuityReceived(_cacheName, _topicName,
message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence);
_lastSequenceNumber = message.Discontinuity.NewTopicSequence;
break;
case _SubscriptionItem.KindOneofCase.Heartbeat:
_logger.LogTraceTopicMessageReceived("heartbeat", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName);
break;
case _SubscriptionItem.KindOneofCase.None:
_logger.LogTraceTopicMessageReceived("none", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("none", _cacheName, _topicName);
break;
default:
_logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
break;
}
}
}
catch (OperationCanceledException)
{

return null;
}
catch (Exception e)

public void Dispose()
{
var sdkException = _exceptionMapper.Convert(e);
return sdkException.ErrorCode == MomentoErrorCode.CANCELLED_ERROR
? null
: new TopicMessage.Error(sdkException);
_subscription.Dispose();
}

return null;
}
}
#endif

0 comments on commit e459101

Please sign in to comment.