diff --git a/examples/DisposableTokens/DisposableTokens.csproj b/examples/DisposableTokens/DisposableTokens.csproj
new file mode 100644
index 00000000..50fc400d
--- /dev/null
+++ b/examples/DisposableTokens/DisposableTokens.csproj
@@ -0,0 +1,21 @@
+
+
+
+ Exe
+ net6.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+
+
diff --git a/examples/DisposableTokens/Program.cs b/examples/DisposableTokens/Program.cs
new file mode 100644
index 00000000..b901bf3d
--- /dev/null
+++ b/examples/DisposableTokens/Program.cs
@@ -0,0 +1,42 @@
+using System;
+using Momento.Sdk;
+using Momento.Sdk.Auth;
+using Momento.Sdk.Auth.AccessControl;
+using Momento.Sdk.Config;
+using Momento.Sdk.Responses;
+
+ICredentialProvider authProvider = new EnvMomentoTokenProvider("MOMENTO_AUTH_TOKEN");
+
+IAuthClient client = new AuthClient(AuthConfigurations.Default.Latest(), authProvider);
+var scope = new DisposableTokenScope(Permissions: new List
+{
+ new DisposableToken.CacheItemPermission(
+ CacheRole.ReadWrite,
+ CacheSelector.ByName("cache"),
+ CacheItemSelector.AllCacheItems
+ ),
+ new DisposableToken.CachePermission(
+ CacheRole.ReadOnly,
+ CacheSelector.ByName("topsecret")
+ ),
+ new DisposableToken.TopicPermission(
+ TopicRole.PublishSubscribe,
+ CacheSelector.ByName("cache"),
+ TopicSelector.ByName("example-topic")
+ )
+});
+var tokenResponse = await client.GenerateDisposableTokenAsync(
+ scope,
+ ExpiresIn.Minutes(5)
+);
+
+if (tokenResponse is GenerateDisposableTokenResponse.Success token)
+{
+ Console.WriteLine("The generated disposable token is: " + token.AuthToken);
+ Console.WriteLine("The token endpoint is: " + token.Endpoint);
+ Console.WriteLine("The token expires at (epoch timestamp): " + token.ExpiresAt.Epoch());
+}
+else if (tokenResponse is GenerateDisposableTokenResponse.Error err)
+{
+ Console.WriteLine("Error generating disposable token: " + err.Message);
+}
diff --git a/examples/DisposableTokens/README.md b/examples/DisposableTokens/README.md
new file mode 100644
index 00000000..534144bd
--- /dev/null
+++ b/examples/DisposableTokens/README.md
@@ -0,0 +1,15 @@
+
+
+# Disposable Tokens Example
+
+This example program demonstrates how to generate disposable Momento auth tokens.
+
+# Usage
+
+The program assumes that your Momento auth token is available in the `MOMENTO_AUTH_TOKEN` environment variable:
+
+```bash
+MOMENTO_AUTH_TOKEN= dotnet run
+```
+
+The example generates a disposable expiring auth token using the enumerated permissions and expiry defined in the program and prints its attributes to the console.
diff --git a/src/Momento.Sdk/Internal/LoggingUtils.cs b/src/Momento.Sdk/Internal/LoggingUtils.cs
index 0716af7a..27bed456 100644
--- a/src/Momento.Sdk/Internal/LoggingUtils.cs
+++ b/src/Momento.Sdk/Internal/LoggingUtils.cs
@@ -450,6 +450,40 @@ public static void LogTraceTopicMessageReceived(this ILogger logger, string mess
logger.LogTrace("Received '{}' message on: cacheName: {}; topicName: {}", messageType, cacheName, topicName);
}
}
+
+ ///
+ /// Logs a message at TRACE level that indicates that a discontinuity was received.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static void LogTraceTopicDiscontinuityReceived(this ILogger logger, string cacheName, string topicName, ulong lastSequenceNumber, ulong newSequenceNumber)
+ {
+ if (logger.IsEnabled(LogLevel.Trace))
+ {
+ logger.LogTrace("Received discontinuity: cacheName: {}; topicName: {}, lastSequenceNumber: {}, newSequenceNumber: {}", cacheName, topicName, lastSequenceNumber, newSequenceNumber);
+ }
+ }
+
+ ///
+ /// Logs a message at TRACE level that indicates that a topic subscription received an error.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static TError LogTraceTopicSubscriptionError(this ILogger logger, string cacheName, string topicName, TError error)
+ {
+ if (logger.IsEnabled(LogLevel.Trace))
+ {
+ logger.LogTrace("An error was received by a subscription: cacheName: {}; topicName: {}; error: {}", cacheName, topicName, error);
+ }
+ return error;
+ }
#endif
///
diff --git a/src/Momento.Sdk/Internal/ScsTopicClient.cs b/src/Momento.Sdk/Internal/ScsTopicClient.cs
index 19d829d8..cf035961 100644
--- a/src/Momento.Sdk/Internal/ScsTopicClient.cs
+++ b/src/Momento.Sdk/Internal/ScsTopicClient.cs
@@ -119,11 +119,11 @@ private async Task SendSubscribe(string cacheName, strin
request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value;
}
- AsyncServerStreamingCall<_SubscriptionItem> subscription;
+ SubscriptionWrapper subscriptionWrapper;
try
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName);
- subscription = grpcManager.Client.subscribe(request, new CallOptions());
+ subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger);
}
catch (Exception e)
{
@@ -132,72 +132,126 @@ private async Task SendSubscribe(string cacheName, strin
}
var response = new TopicSubscribeResponse.Subscription(
- cancellationToken => GetNextRelevantMessageFromGrpcStreamAsync(subscription, cancellationToken, cacheName, topicName),
- subscription.Dispose);
+ cancellationToken => subscriptionWrapper.GetNextRelevantMessageFromGrpcStreamAsync(cancellationToken),
+ subscriptionWrapper.Dispose);
return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicSubscribe, cacheName, topicName,
response);
}
- private async ValueTask GetNextRelevantMessageFromGrpcStreamAsync(AsyncServerStreamingCall<_SubscriptionItem> subscription,
- CancellationToken cancellationToken, string cacheName, string topicName)
+ private class SubscriptionWrapper : IDisposable
{
- if (cancellationToken.IsCancellationRequested)
+ private readonly TopicGrpcManager _grpcManager;
+ private readonly string _cacheName;
+ private readonly string _topicName;
+ private readonly CacheExceptionMapper _exceptionMapper;
+ private readonly ILogger _logger;
+
+ private AsyncServerStreamingCall<_SubscriptionItem> _subscription;
+ private ulong? _lastSequenceNumber;
+
+ public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
+ string topicName, CacheExceptionMapper exceptionMapper, ILogger logger)
{
- return null;
+ _grpcManager = grpcManager;
+ _cacheName = cacheName;
+ _topicName = topicName;
+ _exceptionMapper = exceptionMapper;
+ _logger = logger;
+
+ _subscription = Subscribe();
}
- try
+ private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
+ {
+ var request = new _SubscriptionRequest
+ {
+ CacheName = _cacheName,
+ Topic = _topicName
+ };
+ if (_lastSequenceNumber != null)
+ {
+ request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value;
+ }
+
+ _logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName);
+ return _grpcManager.Client.subscribe(request, new CallOptions());
+ }
+
+ public async ValueTask GetNextRelevantMessageFromGrpcStreamAsync(
+ CancellationToken cancellationToken)
{
- while (await subscription.ResponseStream.MoveNext(cancellationToken))
+ while (!cancellationToken.IsCancellationRequested)
{
- var message = subscription.ResponseStream.Current;
+ try
+ {
+ await _subscription.ResponseStream.MoveNext(cancellationToken);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch (Exception e)
+ {
+ var sdkException = _exceptionMapper.Convert(e);
+ if (sdkException.ErrorCode == MomentoErrorCode.CANCELLED_ERROR)
+ {
+ break;
+ }
+ //TODO: Are there other errors that we should break for?
+
+ _logger.LogTraceTopicSubscriptionError(_cacheName, _topicName, sdkException);
+
+ await Task.Delay(5_000, cancellationToken);
+ Dispose();
+ _subscription = Subscribe();
+ continue;
+ }
+
+ var message = _subscription.ResponseStream.Current;
switch (message.KindCase)
{
case _SubscriptionItem.KindOneofCase.Item:
+ _lastSequenceNumber = message.Item.TopicSequenceNumber;
switch (message.Item.Value.KindCase)
{
case _TopicValue.KindOneofCase.Text:
- _logger.LogTraceTopicMessageReceived("text", cacheName, topicName);
+ _logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName);
return new TopicMessage.Text(message.Item.Value);
case _TopicValue.KindOneofCase.Binary:
- _logger.LogTraceTopicMessageReceived("binary", cacheName, topicName);
+ _logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName);
return new TopicMessage.Binary(message.Item.Value);
case _TopicValue.KindOneofCase.None:
default:
- _logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName);
+ _logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
break;
}
break;
case _SubscriptionItem.KindOneofCase.Discontinuity:
- _logger.LogTraceTopicMessageReceived("discontinuity", cacheName, topicName);
+ _logger.LogTraceTopicDiscontinuityReceived(_cacheName, _topicName,
+ message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence);
+ _lastSequenceNumber = message.Discontinuity.NewTopicSequence;
break;
case _SubscriptionItem.KindOneofCase.Heartbeat:
- _logger.LogTraceTopicMessageReceived("heartbeat", cacheName, topicName);
+ _logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName);
break;
case _SubscriptionItem.KindOneofCase.None:
- _logger.LogTraceTopicMessageReceived("none", cacheName, topicName);
+ _logger.LogTraceTopicMessageReceived("none", _cacheName, _topicName);
break;
default:
- _logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName);
+ _logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
break;
}
}
- }
- catch (OperationCanceledException)
- {
+
return null;
}
- catch (Exception e)
+
+ public void Dispose()
{
- var sdkException = _exceptionMapper.Convert(e);
- return sdkException.ErrorCode == MomentoErrorCode.CANCELLED_ERROR
- ? null
- : new TopicMessage.Error(sdkException);
+ _subscription.Dispose();
}
-
- return null;
}
}
#endif
\ No newline at end of file