Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: return an error from the subscribe method for a bad stream #498

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Momento.Sdk/Exceptions/InternalServerException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Momento.Sdk.Exceptions;
public class InternalServerException : SdkException
{
/// <include file="../docs.xml" path='docs/class[@name="SdkException"]/constructor/*' />
public InternalServerException(string message, MomentoErrorTransportDetails transportDetails, Exception? e = null) : base(MomentoErrorCode.INTERNAL_SERVER_ERROR, message, transportDetails, e)
public InternalServerException(string message, MomentoErrorTransportDetails? transportDetails = null, Exception? e = null) : base(MomentoErrorCode.INTERNAL_SERVER_ERROR, message, transportDetails, e)
{
this.MessageWrapper = "An unexpected error occurred while trying to fulfill the request; please contact us at [email protected]";
}
Expand Down
55 changes: 43 additions & 12 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public async Task<TopicSubscribeResponse> Subscribe(string cacheName, string top

private async Task<TopicPublishResponse> SendPublish(string cacheName, string topicName, _TopicValue value)
{
_PublishRequest request = new _PublishRequest
var request = new _PublishRequest
{
CacheName = cacheName,
Topic = topicName,
Expand Down Expand Up @@ -124,6 +124,7 @@ private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, strin
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName);
subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger);
await subscriptionWrapper.Subscribe();
}
catch (Exception e)
{
Expand All @@ -146,8 +147,9 @@ private class SubscriptionWrapper : IDisposable
private readonly CacheExceptionMapper _exceptionMapper;
private readonly ILogger _logger;

private AsyncServerStreamingCall<_SubscriptionItem> _subscription;
private AsyncServerStreamingCall<_SubscriptionItem>? _subscription;
private ulong? _lastSequenceNumber;
private bool _subscribed;

public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
string topicName, CacheExceptionMapper exceptionMapper, ILogger logger)
Expand All @@ -157,11 +159,9 @@ public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
_topicName = topicName;
_exceptionMapper = exceptionMapper;
_logger = logger;

_subscription = Subscribe();
}

private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
public async Task Subscribe()
{
var request = new _SubscriptionRequest
{
Expand All @@ -174,7 +174,19 @@ private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
}

_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName);
return _grpcManager.Client.subscribe(request, new CallOptions());
var subscription = _grpcManager.Client.subscribe(request, new CallOptions());

await subscription.ResponseStream.MoveNext();
var firstMessage = subscription.ResponseStream.Current;
// The first message to a new subscription will always be a heartbeat.
if (firstMessage.KindCase is not _SubscriptionItem.KindOneofCase.Heartbeat)
{
throw new InternalServerException(
$"Expected heartbeat message for topic {_topicName} on cache {_cacheName}. Got: {firstMessage.KindCase}");
}

_subscription = subscription;
_subscribed = true;
}

public async ValueTask<TopicMessage?> GetNextRelevantMessageFromGrpcStreamAsync(
Expand All @@ -184,7 +196,13 @@ private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
{
try
{
await _subscription.ResponseStream.MoveNext(cancellationToken);
if (!_subscribed)
{
await Subscribe();
_subscribed = true;
}

await _subscription!.ResponseStream.MoveNext(cancellationToken);
}
catch (OperationCanceledException)
{
Expand All @@ -193,17 +211,24 @@ private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
catch (Exception e)
{
var sdkException = _exceptionMapper.Convert(e);
if (sdkException.ErrorCode == MomentoErrorCode.CANCELLED_ERROR)
if (sdkException.ErrorCode is MomentoErrorCode.CANCELLED_ERROR)
{
break;
}
//TODO: Are there other errors that we should break for?

_logger.LogTraceTopicSubscriptionError(_cacheName, _topicName, sdkException);

await Task.Delay(5_000, cancellationToken);
// Certain errors can never be recovered
if (!IsErrorRecoverable(sdkException))
{
return new TopicMessage.Error(sdkException);
}

// If the error is recoverable, wait and attempt to resubscribe
Dispose();
_subscription = Subscribe();
await Task.Delay(5_000, cancellationToken);

_subscribed = false;
continue;
}

Expand Down Expand Up @@ -248,9 +273,15 @@ private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
return null;
}

private static bool IsErrorRecoverable(SdkException exception)
{
return exception.ErrorCode is not (MomentoErrorCode.PERMISSION_ERROR
or MomentoErrorCode.AUTHENTICATION_ERROR);
}

public void Dispose()
{
_subscription.Dispose();
_subscription?.Dispose();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Momento.Sdk/Responses/TopicSubscribeResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public async ValueTask<bool> MoveNextAsync()
return true;
case TopicMessage.Error:
Current = nextMessage;
return true;
return false;
default:
Current = null;
return false;
Expand Down
8 changes: 3 additions & 5 deletions tests/Integration/Momento.Sdk.Tests/AuthClientTopicTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public async Task GenerateDisposableTopicAuthToken_ReadOnly_HappyPath()
}
}

[Fact(Skip = "Enable when SubscribeAsync returns an error")]
[Fact]
public async Task GenerateDisposableTopicAuthToken_WriteOnly_CantSubscribe()
{
var writeOnlyTopicClient = await GetClientForTokenScope(
Expand Down Expand Up @@ -308,10 +308,9 @@ public async Task GenerateDisposableTopicAuthToken_NoCachePerms_CantPublish()
AssertPermissionError<TopicPublishResponse, TopicPublishResponse.Error>(response);
}

[Fact(Skip = "Enable when SubscribeAsync returns an error")]
[Fact]
public async Task GenerateDisposableTopicAuthToken_NoCachePerms_CantSubscribe()
{
Assert.True(false);
var noCachePermsClient = await GetClientForTokenScope(
DisposableTokenScopes.TopicPublishSubscribe("notthecacheyourelookingfor", topicName)
);
Expand All @@ -329,10 +328,9 @@ public async Task GenerateDisposableTopicAuthToken_NoTopicPerms_CantPublish()
AssertPermissionError<TopicPublishResponse, TopicPublishResponse.Error>(response);
}

[Fact(Skip = "Enable when SubscribeAsync returns an error")]
[Fact]
public async Task GenerateDisposableTopicAuthToken_NoTopicPerms_CantSubscribe()
{
Assert.True(false);
var noCachePermsClient = await GetClientForTokenScope(
DisposableTokenScopes.TopicPublishSubscribe(cacheName, "notthetopicyourelookingfor")
);
Expand Down