Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reconnect topic subscriptions on error #478

Merged
merged 4 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/Momento.Sdk/Internal/LoggingUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,24 @@ public static void LogTraceTopicMessageReceived(this ILogger logger, string mess
logger.LogTrace("Received '{}' message on: cacheName: {}; topicName: {}", messageType, cacheName, topicName);
}
}

/// <summary>
/// Logs a message at TRACE level that indicates that a topic subscription received an error.
/// </summary>
/// <typeparam name="TError"></typeparam>
/// <param name="logger"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
/// <param name="error"></param>
/// <returns></returns>
public static TError LogTraceTopicSubscriptionError<TError>(this ILogger logger, string cacheName, string topicName, TError error)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("An error was received by a subscription: cacheName: {}; topicName: {}; error: {}", cacheName, topicName, error);
}
return error;
}
#endif

private static string ReadableByteString(ByteString? input)
Expand Down
109 changes: 81 additions & 28 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, strin
request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value;
}

AsyncServerStreamingCall<_SubscriptionItem> subscription;
SubscriptionWrapper subscriptionWrapper;
try
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName);
subscription = grpcManager.Client.subscribe(request, new CallOptions());
subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger);
}
catch (Exception e)
{
Expand All @@ -132,72 +132,125 @@ private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, strin
}

var response = new TopicSubscribeResponse.Subscription(
cancellationToken => GetNextRelevantMessageFromGrpcStreamAsync(subscription, cancellationToken, cacheName, topicName),
subscription.Dispose);
cancellationToken => subscriptionWrapper.GetNextRelevantMessageFromGrpcStreamAsync(cancellationToken),
subscriptionWrapper.Dispose);
return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicSubscribe, cacheName, topicName,
response);
}

private async ValueTask<TopicMessage?> GetNextRelevantMessageFromGrpcStreamAsync(AsyncServerStreamingCall<_SubscriptionItem> subscription,
CancellationToken cancellationToken, string cacheName, string topicName)
private class SubscriptionWrapper : IDisposable
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do this since I wanted to keep the pr surface as small as possible, but what are people's thoughts on making this the IAsyncEnumerator that the subscription response returns?
https://github.com/momentohq/client-sdk-dotnet/blob/main/src/Momento.Sdk/Responses/TopicSubscribeResponse.cs#L67
That would mean the topic is only actually subscribed to while the enumerator is in use, but it feels like a more simple layer to encapsulate this grpc logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that feels more natural for DotNet users then I think be cool to start with that. I'm not the most experienced in dotnet though be good get @kvcache and also maybe ask a few of our customers in slack who use dotnet what they think. Can DM you with few good targets. Also ok w/ me starting here and fast following w/ wrapper or change to add this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be no user facing change, but it would make the subscription response object cleaner.

{
if (cancellationToken.IsCancellationRequested)
private readonly TopicGrpcManager _grpcManager;
private readonly string _cacheName;
private readonly string _topicName;
private readonly CacheExceptionMapper _exceptionMapper;
private readonly ILogger _logger;

private AsyncServerStreamingCall<_SubscriptionItem> _subscription;
private ulong? _lastSequenceNumber;

public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
string topicName, CacheExceptionMapper exceptionMapper, ILogger logger)
{
return null;
_grpcManager = grpcManager;
_cacheName = cacheName;
_topicName = topicName;
_exceptionMapper = exceptionMapper;
_logger = logger;

_subscription = Subscribe();
}

try
private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
{
var request = new _SubscriptionRequest
{
CacheName = _cacheName,
Topic = _topicName
};
if (_lastSequenceNumber != null)
{
request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value;
}

_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName);
return _grpcManager.Client.subscribe(request, new CallOptions());
}

public async ValueTask<TopicMessage?> GetNextRelevantMessageFromGrpcStreamAsync(
CancellationToken cancellationToken)
{
while (await subscription.ResponseStream.MoveNext(cancellationToken))
while (!cancellationToken.IsCancellationRequested)
{
var message = subscription.ResponseStream.Current;
try
{
await _subscription.ResponseStream.MoveNext(cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception e)
{
var sdkException = _exceptionMapper.Convert(e);
if (sdkException.ErrorCode == MomentoErrorCode.CANCELLED_ERROR)
{
break;
}
//TODO: Are there other errors that we should break for?

_logger.LogTraceTopicSubscriptionError(_cacheName, _topicName, sdkException);

Dispose();
_subscription = Subscribe();
nand4011 marked this conversation as resolved.
Show resolved Hide resolved

await Task.Delay(5_000, cancellationToken);
continue;
}

var message = _subscription.ResponseStream.Current;

switch (message.KindCase)
{
case _SubscriptionItem.KindOneofCase.Item:
_lastSequenceNumber = message.Item.TopicSequenceNumber;
switch (message.Item.Value.KindCase)
{
case _TopicValue.KindOneofCase.Text:
_logger.LogTraceTopicMessageReceived("text", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName);
return new TopicMessage.Text(message.Item.Value);
case _TopicValue.KindOneofCase.Binary:
_logger.LogTraceTopicMessageReceived("binary", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName);
return new TopicMessage.Binary(message.Item.Value);
case _TopicValue.KindOneofCase.None:
default:
_logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
break;
}

break;
case _SubscriptionItem.KindOneofCase.Discontinuity:
_logger.LogTraceTopicMessageReceived("discontinuity", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("discontinuity", _cacheName, _topicName);
nand4011 marked this conversation as resolved.
Show resolved Hide resolved
break;
case _SubscriptionItem.KindOneofCase.Heartbeat:
_logger.LogTraceTopicMessageReceived("heartbeat", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName);
break;
case _SubscriptionItem.KindOneofCase.None:
_logger.LogTraceTopicMessageReceived("none", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("none", _cacheName, _topicName);
break;
default:
_logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
break;
}
}
}
catch (OperationCanceledException)
{

return null;
}
catch (Exception e)

public void Dispose()
{
var sdkException = _exceptionMapper.Convert(e);
return sdkException.ErrorCode == MomentoErrorCode.CANCELLED_ERROR
? null
: new TopicMessage.Error(sdkException);
_subscription.Dispose();
}

return null;
}
}
#endif